diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/fsm.go | 106 | ||||
-rw-r--r-- | server/fsm_test.go | 14 | ||||
-rw-r--r-- | server/peer.go | 1080 | ||||
-rw-r--r-- | server/peer_test.go | 522 | ||||
-rw-r--r-- | server/server.go | 999 |
5 files changed, 1115 insertions, 1606 deletions
diff --git a/server/fsm.go b/server/fsm.go index 0cbdd84e..2ebe8047 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -23,6 +23,7 @@ import ( "gopkg.in/tomb.v2" "io" "net" + "strconv" "time" ) @@ -36,6 +37,7 @@ const ( type fsmMsg struct { MsgType fsmMsgType + MsgSrc string MsgData interface{} } @@ -63,6 +65,7 @@ func (s AdminState) String() string { } type FSM struct { + t tomb.Tomb globalConfig *config.Global peerConfig *config.Neighbor keepaliveTicker *time.Ticker @@ -74,6 +77,8 @@ type FSM struct { negotiatedHoldTime float64 adminState AdminState adminStateCh chan AdminState + getActiveCh chan struct{} + h *FSMHandler } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -124,16 +129,19 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConfig *config.Global, pConfig *config.Neighbor, connCh chan net.Conn) *FSM { - return &FSM{ +func NewFSM(gConfig *config.Global, pConfig *config.Neighbor) *FSM { + fsm := &FSM{ globalConfig: gConfig, peerConfig: pConfig, state: bgp.BGP_FSM_IDLE, - connCh: connCh, + connCh: make(chan net.Conn), opensentHoldTime: float64(HOLDTIME_OPENSENT), adminState: ADMIN_STATE_UP, adminStateCh: make(chan AdminState, 1), + getActiveCh: make(chan struct{}), } + fsm.t.Go(fsm.connectLoop) + return fsm } func (fsm *FSM) StateChange(nextState bgp.FSMState) { @@ -144,6 +152,18 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) { "new": nextState.String(), }).Debug("state changed") fsm.state = nextState + switch nextState { + case bgp.BGP_FSM_ESTABLISHED: + fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix() + fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ + case bgp.BGP_FSM_ACTIVE: + if !fsm.peerConfig.TransportOptions.PassiveMode { + fsm.getActiveCh <- struct{}{} + } + fallthrough + default: + fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() + } } func (fsm *FSM) LocalAddr() net.IP { @@ -179,6 +199,55 @@ func (fsm *FSM) sendNotification(conn net.Conn, code, subType uint8, data []byte fsm.sendNotificatonFromErrorMsg(conn, e.(*bgp.MessageError)) } +func (fsm *FSM) connectLoop() error { + var tick int + if tick = int(fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY { + tick = MIN_CONNECT_RETRY + } + + ticker := time.NewTicker(time.Duration(tick) * time.Second) + ticker.Stop() + + connect := func() { + if bgp.FSMState(fsm.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE { + var host string + addr := fsm.peerConfig.NeighborAddress + + if addr.To4() != nil { + host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT) + } else { + host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT) + } + + conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second) + if err == nil { + fsm.connCh <- conn + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + }).Debugf("failed to connect: %s", err) + } + } + } + + for { + select { + case <-fsm.t.Dying(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + }).Debug("stop connect loop") + ticker.Stop() + return nil + case <-ticker.C: + connect() + case <-fsm.getActiveCh: + ticker = time.NewTicker(time.Duration(tick) * time.Second) + } + } +} + type FSMHandler struct { t tomb.Tomb fsm *FSM @@ -203,15 +272,6 @@ func NewFSMHandler(fsm *FSM, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessag return f } -func (h *FSMHandler) Wait() error { - return h.t.Wait() -} - -func (h *FSMHandler) Stop() error { - h.t.Kill(nil) - return h.t.Wait() -} - func (h *FSMHandler) idle() bgp.FSMState { fsm := h.fsm @@ -354,6 +414,7 @@ func (h *FSMHandler) recvMessageWithError() error { }).Warn("malformed BGP Header") h.msgCh <- &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: err, } return err @@ -381,11 +442,13 @@ func (h *FSMHandler) recvMessageWithError() error { }).Warn("malformed BGP message") fmsg = &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: err, } } else { fmsg = &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: m, } if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { @@ -456,6 +519,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { e := &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: fsm.peerConfig.NeighborAddress.String(), MsgData: m, } h.incoming <- e @@ -656,12 +720,12 @@ func (h *FSMHandler) sendMessageloop() error { // connection is closed and tried to kill us, // we need to die immediately. Otherwise fms // doesn't go to idle. - for len(h.outgoing) > 0 { - m := <-h.outgoing - err := send(m) - if err != nil { - return nil - } + // + // we always try to send. in case b), the + // connection was already closed so it + // correctly works in both cases. + if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + send(bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)) } return nil case m := <-h.outgoing: @@ -759,8 +823,8 @@ func (h *FSMHandler) loop() error { switch fsm.state { case bgp.BGP_FSM_IDLE: nextState = h.idle() - // case bgp.BGP_FSM_CONNECT: - // return h.connect() + // case bgp.BGP_FSM_CONNECT: + // nextState = h.connect() case bgp.BGP_FSM_ACTIVE: nextState = h.active() case bgp.BGP_FSM_OPENSENT: @@ -790,6 +854,7 @@ func (h *FSMHandler) loop() error { if nextState >= bgp.BGP_FSM_IDLE { e := &fsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, + MsgSrc: fsm.peerConfig.NeighborAddress.String(), MsgData: nextState, } h.incoming <- e @@ -800,7 +865,6 @@ func (h *FSMHandler) loop() error { func (h *FSMHandler) changeAdminState(s AdminState) error { fsm := h.fsm if fsm.adminState != s { - log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.peerConfig.NeighborAddress, diff --git a/server/fsm_test.go b/server/fsm_test.go index 8c40d81c..ef962598 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -289,19 +289,14 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { p := &Peer{ globalConfig: globalConfig, - peerConfig: neighborConfig, - connCh: make(chan net.Conn), - serverMsgCh: make(chan *serverMsg), - peerMsgCh: make(chan *peerMsg), - getActiveCh: make(chan struct{}), + config: neighborConfig, capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), } - p.siblings = make(map[string]*serverMsgDataPeer) - p.fsm = NewFSM(&globalConfig, &neighborConfig, p.connCh) + p.fsm = NewFSM(&globalConfig, &neighborConfig) - incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH) - p.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH) + incoming := make(chan *fsmMsg, 4096) + p.outgoing = make(chan *bgp.BGPMessage, 4096) h := &FSMHandler{ fsm: p.fsm, @@ -309,7 +304,6 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { incoming: incoming, outgoing: p.outgoing, } - p.t.Go(p.connectLoop) return p, h diff --git a/server/peer.go b/server/peer.go index 63c907ee..f3b6caf0 100644 --- a/server/peer.go +++ b/server/peer.go @@ -17,198 +17,77 @@ package server import ( "encoding/json" - "fmt" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/policy" "github.com/osrg/gobgp/table" - "gopkg.in/tomb.v2" "net" - "strconv" "time" ) const ( - FSM_CHANNEL_LENGTH = 1024 - FLOP_THRESHOLD = time.Second * 30 - MIN_CONNECT_RETRY = 10 + FLOP_THRESHOLD = time.Second * 30 + MIN_CONNECT_RETRY = 10 ) -type peerMsgType int - -const ( - _ peerMsgType = iota - PEER_MSG_PATH - PEER_MSG_PEER_DOWN -) - -type peerMsg struct { - msgType peerMsgType - msgData interface{} -} - type Peer struct { - t tomb.Tomb globalConfig config.Global - peerConfig config.Neighbor - connCh chan net.Conn - serverMsgCh chan *serverMsg - peerMsgCh chan *peerMsg - getActiveCh chan struct{} + config config.Neighbor fsm *FSM + rfMap map[bgp.RouteFamily]bool + capMap map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface adjRib *table.AdjRib - // peer and rib are always not one-to-one so should not be - // here but it's the simplest and works our first target. - rib *table.TableManager - isGlobalRib bool - rfMap map[bgp.RouteFamily]bool - capMap map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface - peerInfo *table.PeerInfo - siblings map[string]*serverMsgDataPeer - outgoing chan *bgp.BGPMessage - importPolicies []*policy.Policy - defaultImportPolicy config.DefaultPolicyType - exportPolicies []*policy.Policy - defaultExportPolicy config.DefaultPolicyType + peerInfo *table.PeerInfo + outgoing chan *bgp.BGPMessage } -func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer, isGlobalRib bool, policyMap map[string]*policy.Policy) *Peer { - p := &Peer{ +func NewPeer(g config.Global, config config.Neighbor) *Peer { + peer := &Peer{ globalConfig: g, - peerConfig: peer, - connCh: make(chan net.Conn), - serverMsgCh: serverMsgCh, - peerMsgCh: peerMsgCh, - getActiveCh: make(chan struct{}), + config: config, rfMap: make(map[bgp.RouteFamily]bool), capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), - isGlobalRib: isGlobalRib, - } - p.siblings = make(map[string]*serverMsgDataPeer) - for _, s := range peerList { - p.siblings[s.address.String()] = s } - p.fsm = NewFSM(&g, &peer, p.connCh) - peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) - peer.BgpNeighborCommonState.Downtime = time.Now().Unix() - for _, rf := range peer.AfiSafiList { + + config.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) + config.BgpNeighborCommonState.Downtime = time.Now().Unix() + for _, rf := range config.AfiSafiList { k, _ := bgp.GetRouteFamily(rf.AfiSafiName) - p.rfMap[k] = true + peer.rfMap[k] = true } - p.peerInfo = &table.PeerInfo{ - AS: peer.PeerAs, + peer.peerInfo = &table.PeerInfo{ + AS: config.PeerAs, LocalID: g.RouterId, - Address: peer.NeighborAddress, - } - if isGlobalRib { - p.peerInfo.ID = g.RouterId - } - rfList := p.configuredRFlist() - p.adjRib = table.NewAdjRib(rfList) - p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList) - p.setPolicy(policyMap) - p.t.Go(p.loop) - if !peer.TransportOptions.PassiveMode && !isGlobalRib { - p.t.Go(p.connectLoop) + Address: config.NeighborAddress, } - return p + peer.adjRib = table.NewAdjRib(peer.configuredRFlist()) + peer.fsm = NewFSM(&g, &config) + return peer } -func (peer *Peer) setPolicy(policyMap map[string]*policy.Policy) { - // configure import policy - policyConfig := peer.peerConfig.ApplyPolicy - inPolicies := make([]*policy.Policy, 0) - for _, policyName := range policyConfig.ImportPolicies { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "PolicyName": policyName, - }).Info("import policy installed") - if pol, ok := policyMap[policyName]; ok { - log.Debug("import policy : ", pol) - inPolicies = append(inPolicies, pol) - } - } - peer.importPolicies = inPolicies - peer.defaultImportPolicy = policyConfig.DefaultImportPolicy - - // configure export policy - outPolicies := make([]*policy.Policy, 0) - for _, policyName := range policyConfig.ExportPolicies { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "PolicyName": policyName, - }).Info("export policy installed") - if pol, ok := policyMap[policyName]; ok { - log.Debug("export policy : ", pol) - outPolicies = append(outPolicies, pol) - } - } - peer.exportPolicies = outPolicies - peer.defaultExportPolicy = policyConfig.DefaultExportPolicy +func (peer *Peer) isRouteServerClient() bool { + return peer.config.RouteServer.RouteServerClient } func (peer *Peer) configuredRFlist() []bgp.RouteFamily { rfList := []bgp.RouteFamily{} - for _, rf := range peer.peerConfig.AfiSafiList { + for _, rf := range peer.config.AfiSafiList { k, _ := bgp.GetRouteFamily(rf.AfiSafiName) rfList = append(rfList, k) } return rfList } -func (peer *Peer) sendPathsToSiblings(pathList []table.Path) { - if len(pathList) == 0 { - return - } - for _, s := range peer.siblings { - var pm *peerMsg - if peer.peerConfig.RouteServer.RouteServerClient { - checkas := func(asnum uint32, p []table.Path) []table.Path { - plist := []table.Path{} - for _, path := range p { - asList := path.GetAsList() - send := true - for _, as := range asList { - if as == asnum { - send = false - break - } - } - if send { - plist = append(plist, path) - } - } - return plist - } - p := checkas(s.As, pathList) - if len(p) == 0 { - continue - } else { - pm = &peerMsg{ - msgType: PEER_MSG_PATH, - msgData: p, - } - } - } else { - pm = &peerMsg{ - msgType: PEER_MSG_PATH, - msgData: pathList, - } - } - s.peerMsgCh <- pm - } -} - -func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { +func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) ([]table.Path, bool) { + pathList := []table.Path{} log.WithFields(log.Fields{ "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, + "Key": peer.config.NeighborAddress, "data": m, }).Debug("received") + update := false switch m.Header.Type { case bgp.BGP_MSG_OPEN: @@ -245,7 +124,7 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { // by using the smaller of its configured Hold Time and the Hold Time // received in the OPEN message. holdTime := float64(body.HoldTime) - myHoldTime := peer.fsm.peerConfig.Timers.HoldTime + myHoldTime := peer.config.Timers.HoldTime if holdTime > myHoldTime { peer.fsm.negotiatedHoldTime = myHoldTime } else { @@ -258,791 +137,60 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { if _, ok := peer.rfMap[rf]; !ok { log.WithFields(log.Fields{ "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, + "Key": peer.config.NeighborAddress, "Data": rf, }).Warn("Route family isn't supported") - return + break } if _, ok := peer.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; ok { - pathList := peer.adjRib.GetOutPathList(rf) - peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) + pathList = peer.adjRib.GetOutPathList(rf) } else { log.WithFields(log.Fields{ "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, + "Key": peer.config.NeighborAddress, }).Warn("ROUTE_REFRESH received but the capability wasn't advertised") } + case bgp.BGP_MSG_UPDATE: - peer.peerConfig.BgpNeighborCommonState.UpdateRecvTime = time.Now().Unix() + update = true + peer.config.BgpNeighborCommonState.UpdateRecvTime = time.Now().Unix() body := m.Body.(*bgp.BGPUpdate) _, err := bgp.ValidateUpdateMsg(body, peer.rfMap) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, + "Key": peer.config.NeighborAddress, "error": err, }).Warn("malformed BGP update message") m := err.(*bgp.MessageError) if m.TypeCode != 0 { peer.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data) } - return + break } table.UpdatePathAttrs4ByteAs(body) - pathList := table.ProcessMessage(m, peer.peerInfo) + pathList = table.ProcessMessage(m, peer.peerInfo) peer.adjRib.UpdateIn(pathList) - peer.sendPathsToSiblings(pathList) - } -} - -func (peer *Peer) sendMessages(msgs []*bgp.BGPMessage) { - for _, m := range msgs { - if peer.peerConfig.BgpNeighborCommonState.State != uint32(bgp.BGP_FSM_ESTABLISHED) { - continue - } - - if m.Header.Type != bgp.BGP_MSG_UPDATE { - log.Fatal("not update message ", m.Header.Type) - } - - _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] - if !y { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "data": m, - }).Debug("update for 2byte AS peer") - table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate)) - } - - peer.outgoing <- m - } -} - -func (peer *Peer) handleGrpc(grpcReq *GrpcRequest) { - result := &GrpcResponse{} - switch grpcReq.RequestType { - case REQ_GLOBAL_ADD, REQ_GLOBAL_DELETE: - rf := grpcReq.RouteFamily - path, ok := grpcReq.Data.(*api.Path) - if !ok { - result.ResponseErr = fmt.Errorf("type assertion failed") - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - var isWithdraw bool - if grpcReq.RequestType == REQ_GLOBAL_DELETE { - isWithdraw = true - } - - var nlri bgp.AddrPrefixInterface - pattr := make([]bgp.PathAttributeInterface, 0) - pattr = append(pattr, bgp.NewPathAttributeOrigin(bgp.BGP_ORIGIN_ATTR_TYPE_IGP)) - asparam := bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{peer.peerInfo.AS}) - pattr = append(pattr, bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{asparam})) - - switch rf { - case bgp.RF_IPv4_UC: - ip, net, _ := net.ParseCIDR(path.Nlri.Prefix) - if ip.To4() == nil { - result.ResponseErr = fmt.Errorf("Invalid ipv4 prefix: %s", path.Nlri.Prefix) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - ones, _ := net.Mask.Size() - nlri = &bgp.NLRInfo{ - IPAddrPrefix: *bgp.NewIPAddrPrefix(uint8(ones), ip.String()), - } - - pattr = append(pattr, bgp.NewPathAttributeNextHop("0.0.0.0")) - - case bgp.RF_IPv6_UC: - - ip, net, _ := net.ParseCIDR(path.Nlri.Prefix) - if ip.To16() == nil { - result.ResponseErr = fmt.Errorf("Invalid ipv6 prefix: %s", path.Nlri.Prefix) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - ones, _ := net.Mask.Size() - nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String()) - - pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("::", []bgp.AddrPrefixInterface{nlri})) - - case bgp.RF_EVPN: - mac, err := net.ParseMAC(path.Nlri.EvpnNlri.MacIpAdv.MacAddr) - if err != nil { - result.ResponseErr = fmt.Errorf("Invalid mac: %s", path.Nlri.EvpnNlri.MacIpAdv.MacAddr) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - ip := net.ParseIP(path.Nlri.EvpnNlri.MacIpAdv.IpAddr) - if ip == nil { - result.ResponseErr = fmt.Errorf("Invalid ip prefix: %s", path.Nlri.EvpnNlri.MacIpAdv.IpAddr) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - iplen := net.IPv4len * 8 - if ip.To4() == nil { - iplen = net.IPv6len * 8 - } - - macIpAdv := &bgp.EVPNMacIPAdvertisementRoute{ - RD: bgp.NewRouteDistinguisherTwoOctetAS(0, 0), - ESI: bgp.EthernetSegmentIdentifier{ - Type: bgp.ESI_ARBITRARY, - }, - MacAddressLength: 48, - MacAddress: mac, - IPAddressLength: uint8(iplen), - IPAddress: ip, - Labels: []uint32{0}, - } - nlri = bgp.NewEVPNNLRI(bgp.EVPN_ROUTE_TYPE_MAC_IP_ADVERTISEMENT, 0, macIpAdv) - pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) - case bgp.RF_ENCAP: - endpoint := net.ParseIP(path.Nlri.Prefix) - if endpoint == nil { - result.ResponseErr = fmt.Errorf("Invalid endpoint ip address: %s", path.Nlri.Prefix) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - - } - nlri = bgp.NewEncapNLRI(endpoint.String()) - pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) - - iterSubTlvs := func(subTlvs []*api.TunnelEncapSubTLV) { - for _, subTlv := range subTlvs { - if subTlv.Type == api.ENCAP_SUBTLV_TYPE_COLOR { - color := subTlv.Color - subTlv := &bgp.TunnelEncapSubTLV{ - Type: bgp.ENCAP_SUBTLV_TYPE_COLOR, - Value: &bgp.TunnelEncapSubTLVColor{color}, - } - tlv := &bgp.TunnelEncapTLV{ - Type: bgp.TUNNEL_TYPE_VXLAN, - Value: []*bgp.TunnelEncapSubTLV{subTlv}, - } - attr := bgp.NewPathAttributeTunnelEncap([]*bgp.TunnelEncapTLV{tlv}) - pattr = append(pattr, attr) - break - } - } - } - - iterTlvs := func(tlvs []*api.TunnelEncapTLV) { - for _, tlv := range tlvs { - if tlv.Type == api.TUNNEL_TYPE_VXLAN { - iterSubTlvs(tlv.SubTlv) - break - } - } - } - - func(attrs []*api.PathAttr) { - for _, attr := range attrs { - if attr.Type == api.BGP_ATTR_TYPE_TUNNEL_ENCAP { - iterTlvs(attr.TunnelEncap) - break - } - } - }(path.Attrs) - - case bgp.RF_RTC_UC: - var ec bgp.ExtendedCommunityInterface - target := path.Nlri.RtNlri.Target - ec_type := target.Type - ec_subtype := target.Subtype - switch ec_type { - case api.EXTENDED_COMMUNITIE_TYPE_TWO_OCTET_AS_SPECIFIC: - if target.Asn == 0 && target.LocalAdmin == 0 { - break - } - ec = &bgp.TwoOctetAsSpecificExtended{ - SubType: bgp.ExtendedCommunityAttrSubType(ec_subtype), - AS: uint16(target.Asn), - LocalAdmin: target.LocalAdmin, - IsTransitive: true, - } - default: - result.ResponseErr = fmt.Errorf("Invalid endpoint ip address: %s", path.Nlri.Prefix) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - - nlri = bgp.NewRouteTargetMembershipNLRI(peer.globalConfig.As, ec) - - pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) - - default: - result.ResponseErr = fmt.Errorf("Unsupported address family: %s", rf) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - - p, err := table.CreatePath(peer.peerInfo, nlri, pattr, isWithdraw, time.Now()) - if err != nil { - result.ResponseErr = err - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) - return - } - - pm := &peerMsg{ - msgType: PEER_MSG_PATH, - msgData: []table.Path{p}, - } - peer.peerMsgCh <- pm - - case REQ_LOCAL_RIB, REQ_GLOBAL_RIB: - if peer.fsm.adminState == ADMIN_STATE_DOWN { - close(grpcReq.ResponseCh) - return - } - if t, ok := peer.rib.Tables[grpcReq.RouteFamily]; ok { - for _, dst := range t.GetDestinations() { - result := &GrpcResponse{} - result.Data = dst.ToApiStruct() - grpcReq.ResponseCh <- result - } - } - close(grpcReq.ResponseCh) - return - case REQ_NEIGHBOR_SHUTDOWN: - peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) - case REQ_NEIGHBOR_RESET: - peer.fsm.idleHoldTime = peer.peerConfig.Timers.IdleHoldTimeAfterReset - peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) - case REQ_NEIGHBOR_SOFT_RESET, REQ_NEIGHBOR_SOFT_RESET_IN: - // soft-reconfiguration inbound - peer.sendPathsToSiblings(peer.adjRib.GetInPathList(grpcReq.RouteFamily)) - if grpcReq.RequestType == REQ_NEIGHBOR_SOFT_RESET_IN { - break - } - fallthrough - case REQ_NEIGHBOR_SOFT_RESET_OUT: - pathList := peer.adjRib.GetOutPathList(grpcReq.RouteFamily) - peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) - case REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT: - rf := grpcReq.RouteFamily - var paths []table.Path - - if grpcReq.RequestType == REQ_ADJ_RIB_IN { - paths = peer.adjRib.GetInPathList(rf) - log.Debugf("RouteFamily=%v adj-rib-in found : %d", rf.String(), len(paths)) - } else { - paths = peer.adjRib.GetOutPathList(rf) - log.Debugf("RouteFamily=%v adj-rib-out found : %d", rf.String(), len(paths)) - } - - for _, p := range paths { - result := &GrpcResponse{} - path := &api.Path{} - j, _ := json.Marshal(p) - err := json.Unmarshal(j, path) - if err != nil { - result.ResponseErr = err - } else { - result.Data = path - } - grpcReq.ResponseCh <- result - } - close(grpcReq.ResponseCh) - return - case REQ_NEIGHBOR_ENABLE, REQ_NEIGHBOR_DISABLE: - var err api.Error - if grpcReq.RequestType == REQ_NEIGHBOR_ENABLE { - select { - case peer.fsm.adminStateCh <- ADMIN_STATE_UP: - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - }).Debug("ADMIN_STATE_UP requested") - err.Code = api.Error_SUCCESS - err.Msg = "ADMIN_STATE_UP" - default: - log.Warning("previous request is still remaining. : ", peer.peerConfig.NeighborAddress) - err.Code = api.Error_FAIL - err.Msg = "previous request is still remaining" - } - } else { - select { - case peer.fsm.adminStateCh <- ADMIN_STATE_DOWN: - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - }).Debug("ADMIN_STATE_DOWN requested") - err.Code = api.Error_SUCCESS - err.Msg = "ADMIN_STATE_DOWN" - default: - log.Warning("previous request is still remaining. : ", peer.peerConfig.NeighborAddress) - err.Code = api.Error_FAIL - err.Msg = "previous request is still remaining" - } - } - result.Data = err - case REQ_NEIGHBOR_POLICY: - resInPolicies := []*api.PolicyDefinition{} - resOutPolicies := []*api.PolicyDefinition{} - // Add importpolies that has been set in the configuration file to the list. - // However, peer haven't target importpolicy when add PolicyDefinition of name only to the list. - conInPolicyNames := peer.peerConfig.ApplyPolicy.ImportPolicies - for _, conInPolicyName := range conInPolicyNames { - match := false - for _, inPolicy := range peer.importPolicies { - if conInPolicyName == inPolicy.Name { - match = true - resInPolicies = append(resInPolicies, inPolicy.ToApiStruct()) - break - } - } - if !match { - resInPolicies = append(resInPolicies, &api.PolicyDefinition{PolicyDefinitionName: conInPolicyName}) - } - } - // Add importpolies that has been set in the configuration file to the list. - // However, peer haven't target importpolicy when add PolicyDefinition of name only to the list. - conOutPolicyNames := peer.peerConfig.ApplyPolicy.ExportPolicies - for _, conOutPolicyName := range conOutPolicyNames { - match := false - for _, outPolicy := range peer.exportPolicies { - if conOutPolicyName == outPolicy.Name { - match = true - resOutPolicies = append(resOutPolicies, outPolicy.ToApiStruct()) - break - } - } - if !match { - resOutPolicies = append(resOutPolicies, &api.PolicyDefinition{PolicyDefinitionName: conOutPolicyName}) - } - } - defaultInPolicy := policy.ROUTE_REJECT - defaultOutPolicy := policy.ROUTE_REJECT - if peer.defaultImportPolicy == config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { - defaultInPolicy = policy.ROUTE_ACCEPT - } - if peer.defaultExportPolicy == config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { - defaultOutPolicy = policy.ROUTE_ACCEPT - } - result.Data = &api.ApplyPolicy{ - DefaultImportPolicy: defaultInPolicy, - ImportPolicies: resInPolicies, - DefaultExportPolicy: defaultOutPolicy, - ExportPolicies: resOutPolicies, - } - case REQ_NEIGHBOR_POLICY_ADD_IMPORT, REQ_NEIGHBOR_POLICY_ADD_EXPORT, REQ_NEIGHBOR_POLICY_DEL_IMPORT, REQ_NEIGHBOR_POLICY_DEL_EXPORT: - data := grpcReq.Data.([]interface{}) - reqApplyPolicy := data[0].(*api.ApplyPolicy) - reqPolicyMap := data[1].(map[string]*policy.Policy) - applyPolicy := &peer.peerConfig.ApplyPolicy - var defInPolicy, defOutPolicy config.DefaultPolicyType - if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_ADD_IMPORT { - if reqApplyPolicy.DefaultImportPolicy != policy.ROUTE_ACCEPT { - defInPolicy = config.DEFAULT_POLICY_TYPE_REJECT_ROUTE - } - peer.peerConfig.ApplyPolicy.DefaultImportPolicy = defInPolicy - applyPolicy.ImportPolicies = policy.PoliciesToString(reqApplyPolicy.ImportPolicies) - } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_ADD_EXPORT { - if reqApplyPolicy.DefaultExportPolicy != policy.ROUTE_ACCEPT { - defOutPolicy = config.DEFAULT_POLICY_TYPE_REJECT_ROUTE - } - peer.peerConfig.ApplyPolicy.DefaultExportPolicy = defOutPolicy - applyPolicy.ExportPolicies = policy.PoliciesToString(reqApplyPolicy.ExportPolicies) - } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_DEL_IMPORT { - peer.peerConfig.ApplyPolicy.DefaultImportPolicy = config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE - peer.peerConfig.ApplyPolicy.ImportPolicies = make([]string, 0) - } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_DEL_EXPORT { - peer.peerConfig.ApplyPolicy.DefaultExportPolicy = config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE - peer.peerConfig.ApplyPolicy.ExportPolicies = make([]string, 0) - } - peer.setPolicy(reqPolicyMap) - } - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) -} - -func (peer *Peer) sendUpdateMsgFromPaths(pList []table.Path) { - pList = func(arg []table.Path) []table.Path { - ret := make([]table.Path, 0, len(arg)) - for _, path := range arg { - if _, ok := peer.rfMap[path.GetRouteFamily()]; !ok { - continue - } - if peer.peerConfig.NeighborAddress.Equal(path.GetSource().Address) { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("From me, ignore.") - continue - } - - if peer.peerConfig.PeerAs == path.GetSourceAs() { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("AS PATH loop, ignore.") - continue - } - - if !path.IsWithdraw() { - var applied bool = false - applied, path = peer.applyPolicies(peer.exportPolicies, path) - if applied { - if path == nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("Export policy applied and rejected.") - continue - } - } else if peer.defaultExportPolicy != config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("Default export policy applied and rejected.") - continue - } - } - - ret = append(ret, path.Clone(path.IsWithdraw())) - } - return ret - }(pList) - - peer.adjRib.UpdateOut(pList) - - if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) != bgp.BGP_FSM_ESTABLISHED || len(pList) == 0 { - return - } - - pList = func(arg []table.Path) []table.Path { - ret := make([]table.Path, 0, len(arg)) - for _, path := range pList { - isLocal := path.GetSource().ID.Equal(peer.peerInfo.LocalID) - if isLocal { - path.SetNexthop(peer.peerConfig.LocalAddress) - } else { - table.UpdatePathAttrs(path, &peer.globalConfig, &peer.peerConfig) - } - - ret = append(ret, path) - } - return ret - }(pList) - - peer.sendMessages(table.CreateUpdateMsgFromPaths(pList)) -} - -// apply policies to the path -// if multiple policies are defined, -// this function applies each policy to the path in the order that -// policies are stored in the array passed to this function. -// -// the way of applying statements inside a single policy -// - apply statement until the condition in the statement matches. -// if the condition matches the path, apply the action on the statement and -// return value that indicates 'applied' to caller of this function -// - if no statement applied, then process the next policy -// -// if no policy applied, return value that indicates 'not applied' to the caller of this function -// -// return values: -// bool -- indicates that any of policy applied to the path that is passed to this function -// table.Path -- indicates new path object that is the result of modification according to -// policy's action. -// If the applied policy doesn't have a modification action, -// then return the path itself that is passed to this function, otherwise return -// modified path. -// If action of the policy is 'reject', return nil -// -func (peer *Peer) applyPolicies(policies []*policy.Policy, original table.Path) (bool, table.Path) { - - var applied bool = true - - for _, pol := range policies { - if result, action, newpath := pol.Apply(original); result { - log.Debug("newpath: ", newpath) - if action == policy.ROUTE_TYPE_REJECT { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "NRLI": original.GetNlri(), - }).Debug("path was rejected") - // return applied, nil, this means path was rejected - return applied, nil - } else { - // return applied, new path - return applied, newpath - } - } } - log.Debug("no policy applied.", original) - // return not applied, original path - return !applied, original + return pathList, update } -func (peer *Peer) handlePeerMsg(m *peerMsg) { - switch m.msgType { - case PEER_MSG_PATH: - pList := m.msgData.([]table.Path) - - tmp := make([]table.Path, 0, len(pList)) - for _, path := range pList { - if path.IsWithdraw() { - tmp = append(tmp, path) - continue - } - - applied, path := peer.applyPolicies(peer.importPolicies, path) - if applied && path == nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("Import policy applied, reject.") - continue - } else if peer.defaultImportPolicy != config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": path, - }).Debug("Default import policy applied, reject.") - continue - } - - tmp = append(tmp, path) - } - pList = tmp - - if peer.peerConfig.RouteServer.RouteServerClient || peer.isGlobalRib { - pList, _ = peer.rib.ProcessPaths(pList) - } - - if peer.isGlobalRib { - peer.sendPathsToSiblings(pList) - } else { - peer.sendUpdateMsgFromPaths(pList) - } - - case PEER_MSG_PEER_DOWN: - for _, rf := range peer.configuredRFlist() { - pList, _ := peer.rib.DeletePathsforPeer(m.msgData.(*table.PeerInfo), rf) - if peer.peerConfig.RouteServer.RouteServerClient { - peer.sendUpdateMsgFromPaths(pList) - } else if peer.isGlobalRib { - peer.sendPathsToSiblings(pList) - } - } - } -} - -func (peer *Peer) handleServerMsg(m *serverMsg) { - switch m.msgType { - case SRV_MSG_PEER_ADDED: - d := m.msgData.(*serverMsgDataPeer) - peer.siblings[d.address.String()] = d - for _, rf := range peer.configuredRFlist() { - if peer.peerConfig.RouteServer.RouteServerClient { - peer.sendPathsToSiblings(peer.adjRib.GetInPathList(rf)) - } else if peer.isGlobalRib { - peer.sendPathsToSiblings(peer.rib.GetPathList(rf)) - } - } - case SRV_MSG_PEER_DELETED: - d := m.msgData.(*table.PeerInfo) - if _, ok := peer.siblings[d.Address.String()]; ok { - delete(peer.siblings, d.Address.String()) - for _, rf := range peer.configuredRFlist() { - pList, _ := peer.rib.DeletePathsforPeer(d, rf) - if peer.peerConfig.RouteServer.RouteServerClient { - peer.sendUpdateMsgFromPaths(pList) - } else { - peer.sendPathsToSiblings(pList) - } - } - } else { - log.Warning("can not find peer: ", d.Address.String()) - } - case SRV_MSG_API: - peer.handleGrpc(m.msgData.(*GrpcRequest)) - case SRV_MSG_POLICY_UPDATED: - log.Debug("policy updated") - d := m.msgData.(map[string]*policy.Policy) - peer.setPolicy(d) - default: - log.Fatal("unknown server msg type ", m.msgType) - } -} - -func (peer *Peer) connectLoop() error { - var tick int - if tick = int(peer.fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY { - tick = MIN_CONNECT_RETRY - } - - ticker := time.NewTicker(time.Duration(tick) * time.Second) - ticker.Stop() - - connect := func() { - if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE { - var host string - addr := peer.peerConfig.NeighborAddress - - if addr.To4() != nil { - host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT) - } else { - host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT) - } - - conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second) - if err == nil { - peer.connCh <- conn - } else { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - }).Debugf("failed to connect: %s", err) - } - } - } - - for { - select { - case <-peer.t.Dying(): - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - }).Debug("stop connect loop") - ticker.Stop() - return nil - case <-ticker.C: - connect() - case <-peer.getActiveCh: - ticker = time.NewTicker(time.Duration(tick) * time.Second) - } - } -} - -// this goroutine handles routing table operations -func (peer *Peer) loop() error { - for { - incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH) - peer.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH) - - var h *FSMHandler - - if !peer.isGlobalRib { - h = NewFSMHandler(peer.fsm, incoming, peer.outgoing) - switch peer.peerConfig.BgpNeighborCommonState.State { - case uint32(bgp.BGP_FSM_ESTABLISHED): - peer.peerConfig.LocalAddress = peer.fsm.LocalAddr() - for rf, _ := range peer.rfMap { - pathList := peer.adjRib.GetOutPathList(rf) - if !peer.peerConfig.RouteServer.RouteServerClient { - for _, path := range pathList { - path.SetNexthop(peer.peerConfig.LocalAddress) - } - } - peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) - } - peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix() - peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ - case uint32(bgp.BGP_FSM_ACTIVE): - if !peer.peerConfig.TransportOptions.PassiveMode { - peer.getActiveCh <- struct{}{} - } - fallthrough - default: - peer.fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() - } - } - - sameState := true - for sameState { - select { - case <-peer.t.Dying(): - close(peer.connCh) - peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil) - // h.t.Kill(nil) will be called - // internall so even goroutines in - // non-established will be killed. - h.Stop() - return nil - case e := <-incoming: - switch e.MsgType { - case FSM_MSG_STATE_CHANGE: - nextState := e.MsgData.(bgp.FSMState) - // waits for all goroutines created for the current state - h.Wait() - oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) - peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState) - peer.fsm.StateChange(nextState) - sameState = false - if oldState == bgp.BGP_FSM_ESTABLISHED { - t := time.Now() - if t.Sub(time.Unix(peer.fsm.peerConfig.BgpNeighborCommonState.Uptime, 0)) < FLOP_THRESHOLD { - peer.fsm.peerConfig.BgpNeighborCommonState.Flops++ - } - - for _, rf := range peer.configuredRFlist() { - peer.adjRib.DropAllIn(rf) - } - pm := &peerMsg{ - msgType: PEER_MSG_PEER_DOWN, - msgData: peer.peerInfo, - } - for _, s := range peer.siblings { - s.peerMsgCh <- pm - } - } - - // clear counter - if h.fsm.adminState == ADMIN_STATE_DOWN { - h.fsm.peerConfig.BgpNeighborCommonState = config.BgpNeighborCommonState{} - } - - case FSM_MSG_BGP_MESSAGE: - switch m := e.MsgData.(type) { - case *bgp.MessageError: - peer.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data) - case *bgp.BGPMessage: - peer.handleBGPmessage(m) - default: - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.peerConfig.NeighborAddress, - "Data": e.MsgData, - }).Panic("unknonw msg type") - } - } - case m := <-peer.serverMsgCh: - peer.handleServerMsg(m) - case m := <-peer.peerMsgCh: - peer.handlePeerMsg(m) - } +func (peer *Peer) getBests(loc *LocalRib) []table.Path { + pathList := []table.Path{} + for _, rf := range peer.configuredRFlist() { + for _, dst := range loc.rib.Tables[rf].GetDestinations() { + pathList = append(pathList, dst.GetBestPath()) } } + return pathList } -func (peer *Peer) Stop() error { - peer.t.Kill(nil) - return peer.t.Wait() +func (peer *Peer) startFSMHandler(incoming chan *fsmMsg) { + peer.fsm.h = NewFSMHandler(peer.fsm, incoming, peer.outgoing) } func (peer *Peer) PassConn(conn *net.TCPConn) { - peer.connCh <- conn + peer.fsm.connCh <- conn } func (peer *Peer) MarshalJSON() ([]byte, error) { @@ -1059,7 +207,7 @@ func (peer *Peer) ToApiStruct() *api.Peer { remoteCap = append(remoteCap, c.ToApiStruct()) } - caps := capabilitiesFromConfig(&peer.globalConfig, &peer.peerConfig) + caps := capabilitiesFromConfig(&peer.globalConfig, &peer.config) localCap := make([]*api.Capability, 0, len(caps)) for _, c := range caps { localCap = append(localCap, c.ToApiStruct()) @@ -1071,8 +219,8 @@ func (peer *Peer) ToApiStruct() *api.Peer { RemoteAs: c.PeerAs, RemoteCap: remoteCap, LocalCap: localCap, - KeepaliveInterval: uint32(peer.fsm.peerConfig.Timers.KeepaliveInterval), - Holdtime: uint32(peer.fsm.peerConfig.Timers.HoldTime), + KeepaliveInterval: uint32(peer.config.Timers.KeepaliveInterval), + Holdtime: uint32(peer.config.Timers.HoldTime), } s := c.BgpNeighborCommonState @@ -1099,10 +247,10 @@ func (peer *Peer) ToApiStruct() *api.Peer { keepalive := uint32(0) if f.negotiatedHoldTime != 0 { - if f.negotiatedHoldTime < f.peerConfig.Timers.HoldTime { + if f.negotiatedHoldTime < c.Timers.HoldTime { keepalive = uint32(f.negotiatedHoldTime / 3) } else { - keepalive = uint32(f.peerConfig.Timers.KeepaliveInterval) + keepalive = uint32(c.Timers.KeepaliveInterval) } } @@ -1140,3 +288,123 @@ func (peer *Peer) ToApiStruct() *api.Peer { Info: info, } } + +type LocalRib struct { + rib *table.TableManager + importPolicies []*policy.Policy + defaultImportPolicy config.DefaultPolicyType + exportPolicies []*policy.Policy + defaultExportPolicy config.DefaultPolicyType +} + +func NewLocalRib(owner string, rfList []bgp.RouteFamily, policyMap map[string]*policy.Policy) *LocalRib { + return &LocalRib{ + rib: table.NewTableManager(owner, rfList), + } +} + +func (loc *LocalRib) OwnerName() string { + return loc.rib.OwnerName() +} + +func (loc *LocalRib) isGlobal() bool { + return loc.OwnerName() == "global" +} + +func (loc *LocalRib) setPolicy(peer *Peer, policyMap map[string]*policy.Policy) { + // configure import policy + policyConfig := peer.config.ApplyPolicy + inPolicies := make([]*policy.Policy, 0) + for _, policyName := range policyConfig.ImportPolicies { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "PolicyName": policyName, + }).Info("import policy installed") + if pol, ok := policyMap[policyName]; ok { + log.Debug("import policy : ", pol) + inPolicies = append(inPolicies, pol) + } + } + loc.importPolicies = inPolicies + loc.defaultImportPolicy = policyConfig.DefaultImportPolicy + + // configure export policy + outPolicies := make([]*policy.Policy, 0) + for _, policyName := range policyConfig.ExportPolicies { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "PolicyName": policyName, + }).Info("export policy installed") + if pol, ok := policyMap[policyName]; ok { + log.Debug("export policy : ", pol) + outPolicies = append(outPolicies, pol) + } + } + loc.exportPolicies = outPolicies + loc.defaultExportPolicy = policyConfig.DefaultExportPolicy +} + +// apply policies to the path +// if multiple policies are defined, +// this function applies each policy to the path in the order that +// policies are stored in the array passed to this function. +// +// the way of applying statements inside a single policy +// - apply statement until the condition in the statement matches. +// if the condition matches the path, apply the action on the statement and +// return value that indicates 'applied' to caller of this function +// - if no statement applied, then process the next policy +// +// if no policy applied, return value that indicates 'not applied' to the caller of this function +// +// return values: +// bool -- indicates that any of policy applied to the path that is passed to this function +// table.Path -- indicates new path object that is the result of modification according to +// policy's action. +// If the applied policy doesn't have a modification action, +// then return the path itself that is passed to this function, otherwise return +// modified path. +// If action of the policy is 'reject', return nil +// +func (loc *LocalRib) applyPolicies(isExport bool, original table.Path) (bool, table.Path) { + + var applied bool = true + var policies []*policy.Policy + var direction string + if isExport == true { + policies = loc.exportPolicies + direction = "export" + } else { + policies = loc.importPolicies + direction = "import" + } + + for _, pol := range policies { + if result, action, newpath := pol.Apply(original); result { + log.Debug("newpath: ", newpath) + if action == policy.ROUTE_TYPE_REJECT { + log.WithFields(log.Fields{ + "Topic": "Loc", + "Key": loc.OwnerName(), + "NRLI": original.GetNlri(), + "Dir": direction, + }).Debug("path was rejected") + // return applied, nil, this means path was rejected + return applied, nil + } else { + // return applied, new path + return applied, newpath + } + } + } + log.WithFields(log.Fields{ + "Topic": "Loc", + "Key": loc.OwnerName(), + "Len": len(policies), + "NRLI": original, + "Dir": direction, + }).Debug("no policy applied") + return !applied, original +} diff --git a/server/peer_test.go b/server/peer_test.go deleted file mode 100644 index 756607e2..00000000 --- a/server/peer_test.go +++ /dev/null @@ -1,522 +0,0 @@ -// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "fmt" - log "github.com/Sirupsen/logrus" - "github.com/osrg/gobgp/api" - "github.com/osrg/gobgp/config" - "github.com/osrg/gobgp/packet" - "github.com/osrg/gobgp/table" - "github.com/stretchr/testify/assert" - "net" - "reflect" - "testing" - "time" -) - -func peerRC3() *table.PeerInfo { - peer := &table.PeerInfo{ - AS: 66003, - ID: net.ParseIP("10.0.255.3").To4(), - LocalID: net.ParseIP("10.0.255.1").To4(), - } - return peer -} - -func createAsPathAttribute(ases []uint32) *bgp.PathAttributeAsPath { - aspathParam := []bgp.AsPathParamInterface{bgp.NewAs4PathParam(2, ases)} - aspath := bgp.NewPathAttributeAsPath(aspathParam) - return aspath -} - -func createMpReach(nexthop string, prefix []bgp.AddrPrefixInterface) *bgp.PathAttributeMpReachNLRI { - mp_reach := bgp.NewPathAttributeMpReachNLRI(nexthop, prefix) - return mp_reach -} - -func update_fromRC3() *bgp.BGPMessage { - pathAttributes := []bgp.PathAttributeInterface{ - bgp.NewPathAttributeOrigin(1), - createAsPathAttribute([]uint32{66003, 4000, 70000}), - createMpReach("2001:db8::3", - []bgp.AddrPrefixInterface{bgp.NewIPv6AddrPrefix(64, "38:38:38:38::")}), - } - return bgp.NewBGPUpdateMessage([]bgp.WithdrawnRoute{}, pathAttributes, []bgp.NLRInfo{}) -} - -func TestProcessBGPUpdate_fourbyteAS(t *testing.T) { - rib1 := table.NewTableManager("peer_test", []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}) - - m := update_fromRC3() - peerInfo := peerRC3() - pathList := table.ProcessMessage(m, peerInfo) - - pList, _ := rib1.ProcessPaths(pathList) - assert.Equal(t, len(pList), 1) - assert.Equal(t, pList[0].IsWithdraw(), false) - fmt.Println(pList) - sendMsg := table.CreateUpdateMsgFromPaths(pList) - assert.Equal(t, len(sendMsg), 1) - table.UpdatePathAttrs2ByteAs(sendMsg[0].Body.(*bgp.BGPUpdate)) - update := sendMsg[0].Body.(*bgp.BGPUpdate) - assert.Equal(t, len(update.PathAttributes), 4) - assert.Equal(t, reflect.TypeOf(update.PathAttributes[3]).String(), "*bgp.PathAttributeAs4Path") - attr := update.PathAttributes[3].(*bgp.PathAttributeAs4Path) - assert.Equal(t, len(attr.Value), 1) - assert.Equal(t, attr.Value[0].AS, []uint32{66003, 70000}) - attrAS := update.PathAttributes[1].(*bgp.PathAttributeAsPath) - assert.Equal(t, len(attrAS.Value), 1) - assert.Equal(t, attrAS.Value[0].(*bgp.AsPathParam).AS, []uint16{bgp.AS_TRANS, 4000, bgp.AS_TRANS}) - - rib2 := table.NewTableManager("peer_test", []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}) - pList2, _ := rib2.ProcessPaths(pathList) - assert.Equal(t, len(pList2), 1) - assert.Equal(t, pList[0].IsWithdraw(), false) - sendMsg2 := table.CreateUpdateMsgFromPaths(pList2) - assert.Equal(t, len(sendMsg2), 1) - update2 := sendMsg2[0].Body.(*bgp.BGPUpdate) - assert.Equal(t, len(update2.PathAttributes), 3) - attrAS2 := update2.PathAttributes[1].(*bgp.PathAttributeAsPath) - assert.Equal(t, len(attrAS2.Value), 1) - assert.Equal(t, attrAS2.Value[0].(*bgp.As4PathParam).AS, []uint32{66003, 4000, 70000}) -} - -func TestPeerAdminShutdownWhileEstablished(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 10 - - peer.t.Go(peer.loop) - pushPackets := func() { - o, _ := open().Serialize() - m.setData(o) - k, _ := keepalive().Serialize() - m.setData(k) - } - go pushPackets() - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 1000) - - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - lastMsg := m.sendBuf[len(m.sendBuf)-1] - sent, _ := bgp.ParseBGPMessage(lastMsg) - assert.Equal(uint8(bgp.BGP_MSG_NOTIFICATION), sent.Header.Type) - assert.Equal(uint8(bgp.BGP_ERROR_CEASE), sent.Body.(*bgp.BGPNotification).ErrorCode) - assert.Equal(uint8(bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN), sent.Body.(*bgp.BGPNotification).ErrorSubcode) - assert.True(m.isClosed) - - // check counter - counter := peer.fsm.peerConfig.BgpNeighborCommonState - assertCounter(assert, counter) -} - -func TestPeerAdminShutdownWhileIdle(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 10 - peer.fsm.idleHoldTime = 5 - peer.t.Go(peer.loop) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 100) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - - // check counter - counter := peer.fsm.peerConfig.BgpNeighborCommonState - assertCounter(assert, counter) -} - -func TestPeerAdminShutdownWhileActive(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 10 - peer.t.Go(peer.loop) - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 100) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - - // check counter - counter := peer.fsm.peerConfig.BgpNeighborCommonState - assertCounter(assert, counter) -} - -func TestPeerAdminShutdownWhileOpensent(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 1 - peer.t.Go(peer.loop) - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 100) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - lastMsg := m.sendBuf[len(m.sendBuf)-1] - sent, _ := bgp.ParseBGPMessage(lastMsg) - assert.NotEqual(bgp.BGP_MSG_NOTIFICATION, sent.Header.Type) - assert.True(m.isClosed) - - // check counter - counter := peer.fsm.peerConfig.BgpNeighborCommonState - assertCounter(assert, counter) -} - -func TestPeerAdminShutdownWhileOpenconfirm(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 10 - peer.t.Go(peer.loop) - pushPackets := func() { - o, _ := open().Serialize() - m.setData(o) - } - go pushPackets() - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 1000) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - lastMsg := m.sendBuf[len(m.sendBuf)-1] - sent, _ := bgp.ParseBGPMessage(lastMsg) - assert.NotEqual(bgp.BGP_MSG_NOTIFICATION, sent.Header.Type) - assert.True(m.isClosed) - - // check counter - counter := peer.fsm.peerConfig.BgpNeighborCommonState - assertCounter(assert, counter) - -} - -func TestPeerAdminEnable(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - - peer.fsm.opensentHoldTime = 5 - peer.t.Go(peer.loop) - pushPackets := func() { - o, _ := open().Serialize() - m.setData(o) - k, _ := keepalive().Serialize() - m.setData(k) - } - go pushPackets() - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) - - // shutdown peer at first - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 100) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - - // enable peer - grpcReq = NewGrpcRequest(REQ_NEIGHBOR_ENABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg = &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - peer.serverMsgCh <- msg - result = <-grpcReq.ResponseCh - err = result.Data.(api.Error) - assert.Equal(err.Code, api.Error_SUCCESS) - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, (HOLDTIME_IDLE+1)*1000) - assert.Equal(bgp.BGP_FSM_ACTIVE, peer.fsm.state) - - m2 := NewMockConnection() - pushPackets = func() { - o, _ := open().Serialize() - m2.setData(o) - k, _ := keepalive().Serialize() - m2.setData(k) - } - go pushPackets() - - peer.connCh <- m2 - - waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) - assert.Equal(bgp.BGP_FSM_ESTABLISHED, peer.fsm.state) -} - -func TestPeerAdminShutdownReject(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - m.wait = 500 - - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 100000 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 1 - peer.t.Go(peer.loop) - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000) - - grpcReq := NewGrpcRequest(REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.fsm.adminStateCh <- ADMIN_STATE_DOWN - - peer.serverMsgCh <- msg - result := <-grpcReq.ResponseCh - err := result.Data.(api.Error) - assert.Equal(err.Code, api.Error_FAIL) - - grpcReq = NewGrpcRequest(REQ_NEIGHBOR_ENABLE, "0.0.0.0", bgp.RF_IPv4_UC, nil) - msg = &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - - peer.serverMsgCh <- msg - result = <-grpcReq.ResponseCh - err = result.Data.(api.Error) - assert.Equal(err.Code, api.Error_FAIL) - - waitUntil(assert, bgp.BGP_FSM_IDLE, peer, 1000) - assert.Equal(bgp.BGP_FSM_IDLE, peer.fsm.state) - assert.Equal(ADMIN_STATE_DOWN, peer.fsm.adminState) - -} - -func TestPeerSelectSmallerHoldtime(t *testing.T) { - log.SetLevel(log.DebugLevel) - assert := assert.New(t) - m := NewMockConnection() - - globalConfig := config.Global{} - peerConfig := config.Neighbor{} - peerConfig.PeerAs = 65001 - peerConfig.Timers.KeepaliveInterval = 5 - peer := makePeer(globalConfig, peerConfig) - peer.fsm.opensentHoldTime = 1 - peerConfig.Timers.HoldTime = 5 - peer.t.Go(peer.loop) - - pushPackets := func() { - opn := bgp.NewBGPOpenMessage(65001, 0, "10.0.0.1", []bgp.OptionParameterInterface{}) - o, _ := opn.Serialize() - m.setData(o) - } - go pushPackets() - - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.connCh <- m - waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000) - - assert.Equal(float64(0), peer.fsm.negotiatedHoldTime) -} - -func assertCounter(assert *assert.Assertions, counter config.BgpNeighborCommonState) { - assert.Equal(uint32(0), counter.OpenIn) - assert.Equal(uint32(0), counter.OpenOut) - assert.Equal(uint32(0), counter.UpdateIn) - assert.Equal(uint32(0), counter.UpdateOut) - assert.Equal(uint32(0), counter.KeepaliveIn) - assert.Equal(uint32(0), counter.KeepaliveOut) - assert.Equal(uint32(0), counter.NotifyIn) - assert.Equal(uint32(0), counter.NotifyOut) - assert.Equal(uint32(0), counter.EstablishedCount) - assert.Equal(uint32(0), counter.TotalIn) - assert.Equal(uint32(0), counter.TotalOut) - assert.Equal(uint32(0), counter.RefreshIn) - assert.Equal(uint32(0), counter.RefreshOut) - assert.Equal(uint32(0), counter.DynamicCapIn) - assert.Equal(uint32(0), counter.DynamicCapOut) - assert.Equal(uint32(0), counter.EstablishedCount) - assert.Equal(uint32(0), counter.Flops) -} - -func waitUntil(assert *assert.Assertions, state bgp.FSMState, peer *Peer, timeout int64) { - isTimeout := false - expire := func() { - isTimeout = true - } - time.AfterFunc((time.Duration)(timeout)*time.Millisecond, expire) - - for { - time.Sleep(1 * time.Millisecond) - - if peer.fsm.state == state || isTimeout { - assert.Equal(state, peer.fsm.state, "timeout") - break - } - } -} - -func makePeer(globalConfig config.Global, peerConfig config.Neighbor) *Peer { - - sch := make(chan *serverMsg, 8) - pch := make(chan *peerMsg, 4096) - - p := &Peer{ - globalConfig: globalConfig, - peerConfig: peerConfig, - connCh: make(chan net.Conn), - serverMsgCh: sch, - peerMsgCh: pch, - getActiveCh: make(chan struct{}), - rfMap: make(map[bgp.RouteFamily]bool), - capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), - } - p.siblings = make(map[string]*serverMsgDataPeer) - - p.fsm = NewFSM(&globalConfig, &peerConfig, p.connCh) - peerConfig.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) - peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() - if peerConfig.NeighborAddress.To4() != nil { - p.rfMap[bgp.RF_IPv4_UC] = true - } else { - p.rfMap[bgp.RF_IPv6_UC] = true - } - - p.peerInfo = &table.PeerInfo{ - AS: peerConfig.PeerAs, - LocalID: globalConfig.RouterId, - Address: peerConfig.NeighborAddress, - } - rfList := []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC} - p.adjRib = table.NewAdjRib(rfList) - p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList) - p.t.Go(p.connectLoop) - - return p -} diff --git a/server/server.go b/server/server.go index 930e1067..d87bb742 100644 --- a/server/server.go +++ b/server/server.go @@ -16,43 +16,30 @@ package server import ( + "encoding/json" "fmt" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/policy" + "github.com/osrg/gobgp/table" + "gopkg.in/tomb.v2" "net" "os" "strconv" + "time" ) -type serverMsgType int - const ( - _ serverMsgType = iota - SRV_MSG_PEER_ADDED - SRV_MSG_PEER_DELETED - SRV_MSG_API - SRV_MSG_POLICY_UPDATED + GLOBAL_RIB_NAME = "global" ) -type serverMsg struct { - msgType serverMsgType - msgData interface{} -} - -type serverMsgDataPeer struct { - peerMsgCh chan *peerMsg - address net.IP - As uint32 -} - -type peerMapInfo struct { - peer *Peer - serverMsgCh chan *serverMsg - peerMsgCh chan *peerMsg - peerMsgData *serverMsgDataPeer - isRouteServerClient bool +type SenderMsg struct { + messages []*bgp.BGPMessage + sendCh chan *bgp.BGPMessage + destination string + twoBytesAs bool } type BgpServer struct { @@ -62,11 +49,12 @@ type BgpServer struct { deletedPeerCh chan config.Neighbor GrpcReqCh chan *GrpcRequest listenPort int - peerMap map[string]peerMapInfo - globalRib *Peer policyUpdateCh chan config.RoutingPolicy policyMap map[string]*policy.Policy routingPolicy config.RoutingPolicy + + neighborMap map[string]*Peer + localRibMap map[string]*LocalRib } func NewBgpServer(port int) *BgpServer { @@ -76,6 +64,8 @@ func NewBgpServer(port int) *BgpServer { b.deletedPeerCh = make(chan config.Neighbor) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) + b.localRibMap = make(map[string]*LocalRib) + b.neighborMap = make(map[string]*Peer) b.listenPort = port return &b } @@ -107,18 +97,50 @@ func listenAndAccept(proto string, port int, ch chan *net.TCPConn) (*net.TCPList return l, nil } +func (server *BgpServer) addLocalRib(rib *LocalRib) { + server.localRibMap[rib.OwnerName()] = rib +} + func (server *BgpServer) Serve() { g := <-server.globalTypeCh server.bgpConfig.Global = g - globalSch := make(chan *serverMsg, 8) - globalPch := make(chan *peerMsg, 4096) - neighConf := config.Neighbor{ - NeighborAddress: g.RouterId, - AfiSafiList: g.AfiSafiList, - PeerAs: g.As, - } - server.globalRib = NewPeer(g, neighConf, globalSch, globalPch, nil, true, make(map[string]*policy.Policy)) + senderCh := make(chan *SenderMsg, 1<<16) + go func(ch chan *SenderMsg) { + for { + // TODO: must be more clever. Slow peer makes other peers slow too. + m := <-ch + w := func(c chan *bgp.BGPMessage, msg *bgp.BGPMessage) { + // nasty but the peer could already become non established state before here. + defer func() { recover() }() + c <- msg + } + + for _, b := range m.messages { + if m.twoBytesAs == false && b.Header.Type == bgp.BGP_MSG_UPDATE { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": m.destination, + "Data": b, + }).Debug("update for 2byte AS peer") + table.UpdatePathAttrs2ByteAs(b.Body.(*bgp.BGPUpdate)) + } + w(m.sendCh, b) + } + } + }(senderCh) + + // FIXME + rfList := func(l []config.AfiSafi) []bgp.RouteFamily { + rfList := []bgp.RouteFamily{} + for _, rf := range l { + k, _ := bgp.GetRouteFamily(rf.AfiSafiName) + rfList = append(rfList, k) + } + return rfList + }(g.AfiSafiList) + + server.addLocalRib(NewLocalRib(GLOBAL_RIB_NAME, rfList, make(map[string]*policy.Policy))) listenerMap := make(map[string]*net.TCPListener) acceptCh := make(chan *net.TCPConn) @@ -141,100 +163,372 @@ func (server *BgpServer) Serve() { return l } - server.peerMap = make(map[string]peerMapInfo) + incoming := make(chan *fsmMsg, 4096) + var senderMsgs []*SenderMsg for { + var firstMsg *SenderMsg + var sCh chan *SenderMsg + if len(senderMsgs) > 0 { + sCh = senderCh + firstMsg = senderMsgs[0] + } select { case conn := <-acceptCh: remoteAddr, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - info, found := server.peerMap[remoteAddr] + peer, found := server.neighborMap[remoteAddr] if found { log.Debug("accepted a new passive connection from ", remoteAddr) - info.peer.PassConn(conn) + peer.PassConn(conn) } else { log.Info("can't find configuration for a new passive connection from ", remoteAddr) conn.Close() } - case peer := <-server.addedPeerCh: - addr := peer.NeighborAddress.String() - SetTcpMD5SigSockopts(listener(peer.NeighborAddress), addr, peer.AuthPassword) - sch := make(chan *serverMsg, 8) - pch := make(chan *peerMsg, 4096) - var l []*serverMsgDataPeer - if peer.RouteServer.RouteServerClient { - for _, v := range server.peerMap { - if v.isRouteServerClient { - l = append(l, v.peerMsgData) + case config := <-server.addedPeerCh: + addr := config.NeighborAddress.String() + _, found := server.neighborMap[addr] + if found { + log.Warn("Can't overwrite the exising peer ", addr) + continue + } + + SetTcpMD5SigSockopts(listener(config.NeighborAddress), addr, config.AuthPassword) + + peer := NewPeer(g, config) + name := config.NeighborAddress.String() + + if config.RouteServer.RouteServerClient == true { + loc := NewLocalRib(name, peer.configuredRFlist(), make(map[string]*policy.Policy)) + server.addLocalRib(loc) + loc.setPolicy(peer, server.policyMap) + + pathList := make([]table.Path, 0) + for _, p := range server.neighborMap { + if p.isRouteServerClient() == false { + continue + } + for _, rf := range peer.configuredRFlist() { + pathList = append(pathList, p.adjRib.GetInPathList(rf)...) } } - } else { - globalRib := &serverMsgDataPeer{ - address: server.bgpConfig.Global.RouterId, - peerMsgCh: globalPch, + pathList = applyPolicies(peer, loc, false, pathList) + if len(pathList) > 0 { + loc.rib.ProcessPaths(pathList) } - l = []*serverMsgDataPeer{globalRib} - } - p := NewPeer(server.bgpConfig.Global, peer, sch, pch, l, false, server.policyMap) - d := &serverMsgDataPeer{ - address: peer.NeighborAddress, - peerMsgCh: pch, - As: peer.PeerAs, - } - msg := &serverMsg{ - msgType: SRV_MSG_PEER_ADDED, - msgData: d, - } - if peer.RouteServer.RouteServerClient { - sendServerMsgToRSClients(server.peerMap, msg) - } else { - globalSch <- msg } + server.neighborMap[name] = peer + peer.outgoing = make(chan *bgp.BGPMessage, 128) + peer.startFSMHandler(incoming) - server.peerMap[peer.NeighborAddress.String()] = peerMapInfo{ - peer: p, - serverMsgCh: sch, - peerMsgData: d, - isRouteServerClient: peer.RouteServer.RouteServerClient, - } - case peer := <-server.deletedPeerCh: - addr := peer.NeighborAddress.String() - SetTcpMD5SigSockopts(listener(peer.NeighborAddress), addr, "") - info, found := server.peerMap[addr] + case config := <-server.deletedPeerCh: + addr := config.NeighborAddress.String() + SetTcpMD5SigSockopts(listener(config.NeighborAddress), addr, "") + peer, found := server.neighborMap[addr] if found { log.Info("Delete a peer configuration for ", addr) - info.peer.Stop() - delete(server.peerMap, addr) - msg := &serverMsg{ - msgType: SRV_MSG_PEER_DELETED, - msgData: info.peer.peerInfo, + go func(addr string) { + t := time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h.t for ", addr) }) + peer.fsm.h.t.Kill(nil) + peer.fsm.h.t.Wait() + t.Stop() + t = time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h for ", addr) }) + peer.fsm.t.Kill(nil) + peer.fsm.t.Wait() + t.Stop() + }(addr) + + m := server.dropPeerAllRoutes(peer) + if len(m) > 0 { + senderMsgs = append(senderMsgs, m...) } - if info.isRouteServerClient { - sendServerMsgToRSClients(server.peerMap, msg) - } else { - globalSch <- msg + delete(server.neighborMap, addr) + if peer.isRouteServerClient() { + delete(server.localRibMap, addr) } } else { log.Info("Can't delete a peer configuration for ", addr) } + case e := <-incoming: + peer, found := server.neighborMap[e.MsgSrc] + if !found { + log.Warn("Can't find the neighbor ", e.MsgSrc) + break + } + m := server.handleFSMMessage(peer, e, incoming) + if len(m) > 0 { + senderMsgs = append(senderMsgs, m...) + } + case sCh <- firstMsg: + senderMsgs = senderMsgs[1:] + case grpcReq := <-server.GrpcReqCh: - server.handleGrpc(grpcReq) + m := server.handleGrpc(grpcReq) + if len(m) > 0 { + senderMsgs = append(senderMsgs, m...) + } case pl := <-server.policyUpdateCh: server.handlePolicy(pl) } } } -func sendServerMsgToAll(peerMap map[string]peerMapInfo, msg *serverMsg) { - for _, info := range peerMap { - info.serverMsgCh <- msg +func dropSameAsPath(asnum uint32, p []table.Path) []table.Path { + pathList := []table.Path{} + for _, path := range p { + asList := path.GetAsList() + send := true + for _, as := range asList { + if as == asnum { + send = false + break + } + } + if send { + pathList = append(pathList, path) + } + } + return pathList +} + +func newSenderMsg(peer *Peer, messages []*bgp.BGPMessage) *SenderMsg { + _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + return &SenderMsg{ + messages: messages, + sendCh: peer.outgoing, + destination: peer.config.NeighborAddress.String(), + twoBytesAs: y, + } +} + +func filterpath(peer *Peer, pathList []table.Path) []table.Path { + filtered := make([]table.Path, 0) + + for _, path := range pathList { + if _, ok := peer.rfMap[path.GetRouteFamily()]; !ok { + continue + } + + if peer.config.NeighborAddress.Equal(path.GetSource().Address) { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "Data": path, + }).Debug("From me, ignore.") + continue + } + + if peer.config.PeerAs == path.GetSourceAs() { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "Data": path, + }).Debug("AS PATH loop, ignore.") + continue + } + filtered = append(filtered, path.Clone(path.IsWithdraw())) + } + return filtered +} + +func (server *BgpServer) dropPeerAllRoutes(peer *Peer) []*SenderMsg { + msgs := make([]*SenderMsg, 0) + + for _, rf := range peer.configuredRFlist() { + if peer.isRouteServerClient() { + for _, loc := range server.localRibMap { + targetPeer := server.neighborMap[loc.OwnerName()] + if loc.isGlobal() || loc.OwnerName() == peer.config.NeighborAddress.String() { + continue + } + pathList, _ := loc.rib.DeletePathsforPeer(peer.peerInfo, rf) + pathList = dropSameAsPath(targetPeer.config.PeerAs, pathList) + if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED || len(pathList) == 0 { + continue + } + msgList := table.CreateUpdateMsgFromPaths(pathList) + msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + targetPeer.adjRib.UpdateOut(pathList) + } + } else { + loc := server.localRibMap[GLOBAL_RIB_NAME] + pathList, _ := loc.rib.DeletePathsforPeer(peer.peerInfo, rf) + if len(pathList) == 0 { + continue + } + msgList := table.CreateUpdateMsgFromPaths(pathList) + for _, targetPeer := range server.neighborMap { + if targetPeer.isRouteServerClient() || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + targetPeer.adjRib.UpdateOut(pathList) + msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + } + } + } + return msgs +} + +func applyPolicies(peer *Peer, loc *LocalRib, isExport bool, pathList []table.Path) []table.Path { + var defaultPolicy config.DefaultPolicyType + if isExport == true { + defaultPolicy = loc.defaultExportPolicy + } else { + defaultPolicy = loc.defaultImportPolicy + } + + ret := make([]table.Path, 0, len(pathList)) + for _, path := range pathList { + if !path.IsWithdraw() { + var applied bool = false + applied, path = loc.applyPolicies(isExport, path) + if applied { + if path == nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "Data": path, + }).Debug("Policy applied and rejected.") + continue + } + } else if defaultPolicy != config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "Data": path, + }).Debug("Default policy applied and rejected.") + continue + } + } + // FIXME: probably we already clone. + ret = append(ret, path.Clone(path.IsWithdraw())) } + return ret } -func sendServerMsgToRSClients(peerMap map[string]peerMapInfo, msg *serverMsg) { - for _, info := range peerMap { - if info.isRouteServerClient { - info.serverMsgCh <- msg +func (server *BgpServer) propagateUpdate(neighborAddress string, RouteServerClient bool, pathList []table.Path) []*SenderMsg { + msgs := make([]*SenderMsg, 0) + + if RouteServerClient { + for _, loc := range server.localRibMap { + targetPeer := server.neighborMap[loc.OwnerName()] + if loc.isGlobal() || loc.OwnerName() == neighborAddress { + continue + } + sendPathList, _ := loc.rib.ProcessPaths(applyPolicies(targetPeer, loc, false, dropSameAsPath(targetPeer.config.PeerAs, filterpath(targetPeer, pathList)))) + if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED || len(sendPathList) == 0 { + continue + } + sendPathList = applyPolicies(targetPeer, loc, true, sendPathList) + if len(sendPathList) == 0 { + continue + } + msgList := table.CreateUpdateMsgFromPaths(sendPathList) + targetPeer.adjRib.UpdateOut(sendPathList) + msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + } + } else { + globalLoc := server.localRibMap[GLOBAL_RIB_NAME] + sendPathList, _ := globalLoc.rib.ProcessPaths(pathList) + if len(sendPathList) == 0 { + return msgs + } + + for _, targetPeer := range server.neighborMap { + if targetPeer.isRouteServerClient() || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + f := filterpath(targetPeer, sendPathList) + for _, path := range f { + path.SetNexthop(targetPeer.config.LocalAddress) + } + targetPeer.adjRib.UpdateOut(f) + msgList := table.CreateUpdateMsgFromPaths(f) + msgs = append(msgs, newSenderMsg(targetPeer, msgList)) } } + return msgs +} + +func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan *fsmMsg) []*SenderMsg { + msgs := make([]*SenderMsg, 0) + + switch e.MsgType { + case FSM_MSG_STATE_CHANGE: + nextState := e.MsgData.(bgp.FSMState) + oldState := bgp.FSMState(peer.config.BgpNeighborCommonState.State) + go func(t *tomb.Tomb, addr string, oldState, newState bgp.FSMState) { + e := time.AfterFunc(time.Second*30, func() { log.Fatal("failed to free the fsm.h.t for ", addr, oldState, newState) }) + t.Wait() + e.Stop() + }(&peer.fsm.h.t, peer.config.NeighborAddress.String(), oldState, nextState) + peer.config.BgpNeighborCommonState.State = uint32(nextState) + peer.fsm.StateChange(nextState) + globalRib := server.localRibMap[GLOBAL_RIB_NAME] + + if oldState == bgp.BGP_FSM_ESTABLISHED { + t := time.Now() + if t.Sub(time.Unix(peer.config.BgpNeighborCommonState.Uptime, 0)) < FLOP_THRESHOLD { + peer.config.BgpNeighborCommonState.Flops++ + } + + for _, rf := range peer.configuredRFlist() { + peer.adjRib.DropAll(rf) + } + + msgs = append(msgs, server.dropPeerAllRoutes(peer)...) + } + + close(peer.outgoing) + peer.outgoing = make(chan *bgp.BGPMessage, 128) + if nextState == bgp.BGP_FSM_ESTABLISHED { + pathList := make([]table.Path, 0) + if peer.isRouteServerClient() { + loc := server.localRibMap[peer.config.NeighborAddress.String()] + pathList = applyPolicies(peer, loc, true, peer.getBests(loc)) + } else { + peer.config.LocalAddress = peer.fsm.LocalAddr() + for _, path := range peer.getBests(globalRib) { + p := path.Clone(path.IsWithdraw()) + p.SetNexthop(peer.config.LocalAddress) + pathList = append(pathList, p) + } + } + if len(pathList) > 0 { + peer.adjRib.UpdateOut(pathList) + msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList))) + } + } else { + peer.config.BgpNeighborCommonState.Downtime = time.Now().Unix() + } + // clear counter + if peer.fsm.adminState == ADMIN_STATE_DOWN { + peer.config.BgpNeighborCommonState = config.BgpNeighborCommonState{} + } + peer.startFSMHandler(incoming) + + case FSM_MSG_BGP_MESSAGE: + switch m := e.MsgData.(type) { + case *bgp.MessageError: + msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) + case *bgp.BGPMessage: + pathList, update := peer.handleBGPmessage(m) + if update == false { + if len(pathList) > 0 { + msgList := table.CreateUpdateMsgFromPaths(pathList) + msgs = append(msgs, newSenderMsg(peer, msgList)) + } + break + } + msgs = append(msgs, server.propagateUpdate(peer.config.NeighborAddress.String(), + peer.isRouteServerClient(), pathList)...) + default: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + "Data": e.MsgData, + }).Panic("unknonw msg type") + } + } + return msgs } func (server *BgpServer) SetGlobalType(g config.Global) { @@ -265,80 +559,483 @@ func (server *BgpServer) SetPolicy(pl config.RoutingPolicy) { func (server *BgpServer) handlePolicy(pl config.RoutingPolicy) { server.SetPolicy(pl) - msg := &serverMsg{ - msgType: SRV_MSG_POLICY_UPDATED, - msgData: server.policyMap, + for _, loc := range server.localRibMap { + if loc.isGlobal() { + continue + } + targetPeer := server.neighborMap[loc.OwnerName()] + loc.setPolicy(targetPeer, server.policyMap) + } +} + +func (server *BgpServer) checkNeighborRequest(grpcReq *GrpcRequest) (*Peer, error) { + remoteAddr := grpcReq.RemoteAddr + peer, found := server.neighborMap[remoteAddr] + if !found { + result := &GrpcResponse{} + result.ResponseErr = fmt.Errorf("Neighbor that has %v doesn't exist.", remoteAddr) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return nil, result.ResponseErr + } + return peer, nil +} + +func handleGlobalRibRequest(grpcReq *GrpcRequest, peerInfo *table.PeerInfo) []table.Path { + pathList := []table.Path{} + result := &GrpcResponse{} + + rf := grpcReq.RouteFamily + path, ok := grpcReq.Data.(*api.Path) + if !ok { + result.ResponseErr = fmt.Errorf("type assertion failed") + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + var isWithdraw bool + if grpcReq.RequestType == REQ_GLOBAL_DELETE { + isWithdraw = true + } + + var nlri bgp.AddrPrefixInterface + pattr := make([]bgp.PathAttributeInterface, 0) + pattr = append(pattr, bgp.NewPathAttributeOrigin(bgp.BGP_ORIGIN_ATTR_TYPE_IGP)) + asparam := bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{peerInfo.AS}) + pattr = append(pattr, bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{asparam})) + + switch rf { + case bgp.RF_IPv4_UC: + ip, net, _ := net.ParseCIDR(path.Nlri.Prefix) + if ip.To4() == nil { + result.ResponseErr = fmt.Errorf("Invalid ipv4 prefix: %s", path.Nlri.Prefix) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + ones, _ := net.Mask.Size() + nlri = &bgp.NLRInfo{ + IPAddrPrefix: *bgp.NewIPAddrPrefix(uint8(ones), ip.String()), + } + + pattr = append(pattr, bgp.NewPathAttributeNextHop("0.0.0.0")) + + case bgp.RF_IPv6_UC: + + ip, net, _ := net.ParseCIDR(path.Nlri.Prefix) + if ip.To16() == nil { + result.ResponseErr = fmt.Errorf("Invalid ipv6 prefix: %s", path.Nlri.Prefix) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + ones, _ := net.Mask.Size() + nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String()) + + pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("::", []bgp.AddrPrefixInterface{nlri})) + + case bgp.RF_EVPN: + mac, err := net.ParseMAC(path.Nlri.EvpnNlri.MacIpAdv.MacAddr) + if err != nil { + result.ResponseErr = fmt.Errorf("Invalid mac: %s", path.Nlri.EvpnNlri.MacIpAdv.MacAddr) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + ip := net.ParseIP(path.Nlri.EvpnNlri.MacIpAdv.IpAddr) + if ip == nil { + result.ResponseErr = fmt.Errorf("Invalid ip prefix: %s", path.Nlri.EvpnNlri.MacIpAdv.IpAddr) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + iplen := net.IPv4len * 8 + if ip.To4() == nil { + iplen = net.IPv6len * 8 + } + + macIpAdv := &bgp.EVPNMacIPAdvertisementRoute{ + RD: bgp.NewRouteDistinguisherTwoOctetAS(0, 0), + ESI: bgp.EthernetSegmentIdentifier{ + Type: bgp.ESI_ARBITRARY, + }, + MacAddressLength: 48, + MacAddress: mac, + IPAddressLength: uint8(iplen), + IPAddress: ip, + Labels: []uint32{0}, + } + nlri = bgp.NewEVPNNLRI(bgp.EVPN_ROUTE_TYPE_MAC_IP_ADVERTISEMENT, 0, macIpAdv) + pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) + case bgp.RF_ENCAP: + endpoint := net.ParseIP(path.Nlri.Prefix) + if endpoint == nil { + result.ResponseErr = fmt.Errorf("Invalid endpoint ip address: %s", path.Nlri.Prefix) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + + } + nlri = bgp.NewEncapNLRI(endpoint.String()) + pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) + + iterSubTlvs := func(subTlvs []*api.TunnelEncapSubTLV) { + for _, subTlv := range subTlvs { + if subTlv.Type == api.ENCAP_SUBTLV_TYPE_COLOR { + color := subTlv.Color + subTlv := &bgp.TunnelEncapSubTLV{ + Type: bgp.ENCAP_SUBTLV_TYPE_COLOR, + Value: &bgp.TunnelEncapSubTLVColor{color}, + } + tlv := &bgp.TunnelEncapTLV{ + Type: bgp.TUNNEL_TYPE_VXLAN, + Value: []*bgp.TunnelEncapSubTLV{subTlv}, + } + attr := bgp.NewPathAttributeTunnelEncap([]*bgp.TunnelEncapTLV{tlv}) + pattr = append(pattr, attr) + break + } + } + } + + iterTlvs := func(tlvs []*api.TunnelEncapTLV) { + for _, tlv := range tlvs { + if tlv.Type == api.TUNNEL_TYPE_VXLAN { + iterSubTlvs(tlv.SubTlv) + break + } + } + } + + func(attrs []*api.PathAttr) { + for _, attr := range attrs { + if attr.Type == api.BGP_ATTR_TYPE_TUNNEL_ENCAP { + iterTlvs(attr.TunnelEncap) + break + } + } + }(path.Attrs) + + case bgp.RF_RTC_UC: + var ec bgp.ExtendedCommunityInterface + target := path.Nlri.RtNlri.Target + ec_type := target.Type + ec_subtype := target.Subtype + switch ec_type { + case api.EXTENDED_COMMUNITIE_TYPE_TWO_OCTET_AS_SPECIFIC: + if target.Asn == 0 && target.LocalAdmin == 0 { + break + } + ec = &bgp.TwoOctetAsSpecificExtended{ + SubType: bgp.ExtendedCommunityAttrSubType(ec_subtype), + AS: uint16(target.Asn), + LocalAdmin: target.LocalAdmin, + IsTransitive: true, + } + default: + result.ResponseErr = fmt.Errorf("Invalid endpoint ip address: %s", path.Nlri.Prefix) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + + nlri = bgp.NewRouteTargetMembershipNLRI(peerInfo.AS, ec) + + pattr = append(pattr, bgp.NewPathAttributeMpReachNLRI("0.0.0.0", []bgp.AddrPrefixInterface{nlri})) + + default: + result.ResponseErr = fmt.Errorf("Unsupported address family: %s", rf) + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList + } + + p, err := table.CreatePath(peerInfo, nlri, pattr, isWithdraw, time.Now()) + if err != nil { + result.ResponseErr = err + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + return pathList } - sendServerMsgToAll(server.peerMap, msg) + return []table.Path{p} } -func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { +func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { + msgs := make([]*SenderMsg, 0) + switch grpcReq.RequestType { + case REQ_GLOBAL_RIB: + if t, ok := server.localRibMap[GLOBAL_RIB_NAME].rib.Tables[grpcReq.RouteFamily]; ok { + for _, dst := range t.GetDestinations() { + result := &GrpcResponse{} + result.Data = dst.ToApiStruct() + grpcReq.ResponseCh <- result + } + } + close(grpcReq.ResponseCh) + + case REQ_GLOBAL_ADD, REQ_GLOBAL_DELETE: + pi := &table.PeerInfo{ + AS: server.bgpConfig.Global.As, + LocalID: server.bgpConfig.Global.RouterId, + } + pathList := handleGlobalRibRequest(grpcReq, pi) + if len(pathList) > 0 { + server.propagateUpdate("", false, pathList) + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + } + case REQ_NEIGHBORS: - for _, info := range server.peerMap { + for _, peer := range server.neighborMap { result := &GrpcResponse{ - Data: info.peer.ToApiStruct(), + Data: peer.ToApiStruct(), } grpcReq.ResponseCh <- result } close(grpcReq.ResponseCh) + case REQ_NEIGHBOR: - remoteAddr := grpcReq.RemoteAddr - var result *GrpcResponse - info, found := server.peerMap[remoteAddr] - if found { - result = &GrpcResponse{ - Data: info.peer.ToApiStruct(), - } - } else { - result = &GrpcResponse{ - ResponseErr: fmt.Errorf("Neighbor that has %v doesn't exist.", remoteAddr), - } + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + result := &GrpcResponse{ + Data: peer.ToApiStruct(), } grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) - case REQ_GLOBAL_RIB, REQ_GLOBAL_ADD, REQ_GLOBAL_DELETE: - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - server.globalRib.serverMsgCh <- msg - case REQ_LOCAL_RIB, REQ_NEIGHBOR_SHUTDOWN, REQ_NEIGHBOR_RESET, - REQ_NEIGHBOR_SOFT_RESET, REQ_NEIGHBOR_SOFT_RESET_IN, REQ_NEIGHBOR_SOFT_RESET_OUT, - REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT, - REQ_NEIGHBOR_ENABLE, REQ_NEIGHBOR_DISABLE, - REQ_NEIGHBOR_POLICY: - remoteAddr := grpcReq.RemoteAddr - result := &GrpcResponse{} - info, found := server.peerMap[remoteAddr] - if found { - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, + + case REQ_LOCAL_RIB: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + if peer.fsm.adminState != ADMIN_STATE_DOWN { + remoteAddr := grpcReq.RemoteAddr + if t, ok := server.localRibMap[remoteAddr].rib.Tables[grpcReq.RouteFamily]; ok { + for _, dst := range t.GetDestinations() { + result := &GrpcResponse{} + result.Data = dst.ToApiStruct() + grpcReq.ResponseCh <- result + } } - info.peer.serverMsgCh <- msg + } + close(grpcReq.ResponseCh) + + case REQ_ADJ_RIB_IN, REQ_ADJ_RIB_OUT: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + rf := grpcReq.RouteFamily + var paths []table.Path + + if grpcReq.RequestType == REQ_ADJ_RIB_IN { + paths = peer.adjRib.GetInPathList(rf) + log.Debugf("RouteFamily=%v adj-rib-in found : %d", rf.String(), len(paths)) } else { - result.ResponseErr = fmt.Errorf("Neighbor that has %v doesn't exist.", remoteAddr) + paths = peer.adjRib.GetOutPathList(rf) + log.Debugf("RouteFamily=%v adj-rib-out found : %d", rf.String(), len(paths)) + } + + for _, p := range paths { + result := &GrpcResponse{} + path := &api.Path{} + j, _ := json.Marshal(p) + err := json.Unmarshal(j, path) + if err != nil { + result.ResponseErr = err + } else { + result.Data = path + } grpcReq.ResponseCh <- result + } + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_SHUTDOWN: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) + msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_RESET: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + peer.fsm.idleHoldTime = peer.config.Timers.IdleHoldTimeAfterReset + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) + msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_SOFT_RESET, REQ_NEIGHBOR_SOFT_RESET_IN: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + // soft-reconfiguration inbound + pathList := peer.adjRib.GetInPathList(grpcReq.RouteFamily) + msgs = append(msgs, server.propagateUpdate(peer.config.NeighborAddress.String(), + peer.isRouteServerClient(), pathList)...) + + if grpcReq.RequestType == REQ_NEIGHBOR_SOFT_RESET_IN { + grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) + break } - case REQ_NEIGHBOR_POLICY_ADD_IMPORT, REQ_NEIGHBOR_POLICY_ADD_EXPORT, REQ_NEIGHBOR_POLICY_DEL_IMPORT, REQ_NEIGHBOR_POLICY_DEL_EXPORT: - remoteAddr := grpcReq.RemoteAddr + fallthrough + case REQ_NEIGHBOR_SOFT_RESET_OUT: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + pathList := peer.adjRib.GetOutPathList(grpcReq.RouteFamily) + msgList := table.CreateUpdateMsgFromPaths(pathList) + msgs = append(msgs, newSenderMsg(peer, msgList)) + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_ENABLE, REQ_NEIGHBOR_DISABLE: + peer, err1 := server.checkNeighborRequest(grpcReq) + if err1 != nil { + break + } + var err api.Error result := &GrpcResponse{} - info, found := server.peerMap[remoteAddr] - if found { - reqApplyPolicy := grpcReq.Data.(*api.ApplyPolicy) - grpcReq.Data = []interface{}{reqApplyPolicy, server.policyMap} - msg := &serverMsg{ - msgType: SRV_MSG_API, - msgData: grpcReq, - } - info.peer.serverMsgCh <- msg + if grpcReq.RequestType == REQ_NEIGHBOR_ENABLE { + select { + case peer.fsm.adminStateCh <- ADMIN_STATE_UP: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + }).Debug("ADMIN_STATE_UP requested") + err.Code = api.Error_SUCCESS + err.Msg = "ADMIN_STATE_UP" + default: + log.Warning("previous request is still remaining. : ", peer.config.NeighborAddress) + err.Code = api.Error_FAIL + err.Msg = "previous request is still remaining" + } } else { - result.ResponseErr = fmt.Errorf("Neighbor that has %v doesn't exist.", remoteAddr) - grpcReq.ResponseCh <- result - close(grpcReq.ResponseCh) + select { + case peer.fsm.adminStateCh <- ADMIN_STATE_DOWN: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.config.NeighborAddress, + }).Debug("ADMIN_STATE_DOWN requested") + err.Code = api.Error_SUCCESS + err.Msg = "ADMIN_STATE_DOWN" + default: + log.Warning("previous request is still remaining. : ", peer.config.NeighborAddress) + err.Code = api.Error_FAIL + err.Msg = "previous request is still remaining" + } } + result.Data = err + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_POLICY: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + + resInPolicies := []*api.PolicyDefinition{} + resOutPolicies := []*api.PolicyDefinition{} + // Add importpolies that has been set in the configuration file to the list. + // However, peer haven't target importpolicy when add PolicyDefinition of name only to the list. + conInPolicyNames := peer.config.ApplyPolicy.ImportPolicies + loc := server.localRibMap[peer.config.NeighborAddress.String()] + for _, conInPolicyName := range conInPolicyNames { + match := false + for _, inPolicy := range loc.importPolicies { + if conInPolicyName == inPolicy.Name { + match = true + resInPolicies = append(resInPolicies, inPolicy.ToApiStruct()) + break + } + } + if !match { + resInPolicies = append(resInPolicies, &api.PolicyDefinition{PolicyDefinitionName: conInPolicyName}) + } + } + // Add importpolies that has been set in the configuration file to the list. + // However, peer haven't target importpolicy when add PolicyDefinition of name only to the list. + conOutPolicyNames := peer.config.ApplyPolicy.ExportPolicies + for _, conOutPolicyName := range conOutPolicyNames { + match := false + for _, outPolicy := range loc.exportPolicies { + if conOutPolicyName == outPolicy.Name { + match = true + resOutPolicies = append(resOutPolicies, outPolicy.ToApiStruct()) + break + } + } + if !match { + resOutPolicies = append(resOutPolicies, &api.PolicyDefinition{PolicyDefinitionName: conOutPolicyName}) + } + } + defaultInPolicy := policy.ROUTE_REJECT + defaultOutPolicy := policy.ROUTE_REJECT + if loc.defaultImportPolicy == config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { + defaultInPolicy = policy.ROUTE_ACCEPT + } + if loc.defaultExportPolicy == config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE { + defaultOutPolicy = policy.ROUTE_ACCEPT + } + result := &GrpcResponse{ + Data: &api.ApplyPolicy{ + DefaultImportPolicy: defaultInPolicy, + ImportPolicies: resInPolicies, + DefaultExportPolicy: defaultOutPolicy, + ExportPolicies: resOutPolicies, + }, + } + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + + case REQ_NEIGHBOR_POLICY_ADD_IMPORT, REQ_NEIGHBOR_POLICY_ADD_EXPORT, REQ_NEIGHBOR_POLICY_DEL_IMPORT, REQ_NEIGHBOR_POLICY_DEL_EXPORT: + peer, err := server.checkNeighborRequest(grpcReq) + if err != nil { + break + } + reqApplyPolicy := grpcReq.Data.(*api.ApplyPolicy) + grpcReq.Data = []interface{}{reqApplyPolicy, server.policyMap} + + reqPolicyMap := server.policyMap + applyPolicy := &peer.config.ApplyPolicy + var defInPolicy, defOutPolicy config.DefaultPolicyType + if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_ADD_IMPORT { + if reqApplyPolicy.DefaultImportPolicy != policy.ROUTE_ACCEPT { + defInPolicy = config.DEFAULT_POLICY_TYPE_REJECT_ROUTE + } + peer.config.ApplyPolicy.DefaultImportPolicy = defInPolicy + applyPolicy.ImportPolicies = policy.PoliciesToString(reqApplyPolicy.ImportPolicies) + } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_ADD_EXPORT { + if reqApplyPolicy.DefaultExportPolicy != policy.ROUTE_ACCEPT { + defOutPolicy = config.DEFAULT_POLICY_TYPE_REJECT_ROUTE + } + peer.config.ApplyPolicy.DefaultExportPolicy = defOutPolicy + applyPolicy.ExportPolicies = policy.PoliciesToString(reqApplyPolicy.ExportPolicies) + } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_DEL_IMPORT { + peer.config.ApplyPolicy.DefaultImportPolicy = config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE + peer.config.ApplyPolicy.ImportPolicies = make([]string, 0) + } else if grpcReq.RequestType == REQ_NEIGHBOR_POLICY_DEL_EXPORT { + peer.config.ApplyPolicy.DefaultExportPolicy = config.DEFAULT_POLICY_TYPE_ACCEPT_ROUTE + peer.config.ApplyPolicy.ExportPolicies = make([]string, 0) + } + loc := server.localRibMap[peer.config.NeighborAddress.String()] + loc.setPolicy(peer, reqPolicyMap) + grpcReq.ResponseCh <- &GrpcResponse{} + close(grpcReq.ResponseCh) + case REQ_POLICY_PREFIXES: info := server.routingPolicy.DefinedSets.PrefixSetList result := &GrpcResponse{} @@ -355,6 +1052,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { grpcReq.ResponseCh <- result } close(grpcReq.ResponseCh) + case REQ_POLICY_PREFIX: name := grpcReq.Data.(string) info := server.routingPolicy.DefinedSets.PrefixSetList @@ -376,6 +1074,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { grpcReq.ResponseCh <- result } close(grpcReq.ResponseCh) + case REQ_POLICY_PREFIX_ADD: reqPrefixSet := grpcReq.Data.(*api.PrefixSet) conPrefixSetList := server.routingPolicy.DefinedSets.PrefixSetList @@ -400,6 +1099,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { server.handlePolicy(server.routingPolicy) grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) + case REQ_POLICY_PREFIX_DELETE: reqPrefixSet := grpcReq.Data.(*api.PrefixSet) conPrefixSetList := server.routingPolicy.DefinedSets.PrefixSetList @@ -407,7 +1107,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { isReqPrefixSet, prefixSet := policy.PrefixSetToConfigStruct(reqPrefixSet) if isReqPrefixSet { // If only name of the PrefixSet is same, delete all of the elements of the PrefixSet. - // If the same element PrefixSet, delete the it's element from PrefixSet. + //If the same element PrefixSet, delete the it's element from PrefixSet. idxPrefixSet, idxPrefix := policy.IndexOfPrefixSet(conPrefixSetList, prefixSet) prefix := prefixSet.PrefixList[0] if idxPrefixSet == -1 { @@ -440,12 +1140,14 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { server.handlePolicy(server.routingPolicy) grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) + case REQ_POLICY_PREFIXES_DELETE: result := &GrpcResponse{} server.routingPolicy.DefinedSets.PrefixSetList = make([]config.PrefixSet, 0) server.handlePolicy(server.routingPolicy) grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) + case REQ_POLICY_NEIGHBORS: info := server.routingPolicy.DefinedSets.NeighborSetList result := &GrpcResponse{} @@ -462,6 +1164,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { grpcReq.ResponseCh <- result } close(grpcReq.ResponseCh) + case REQ_POLICY_NEIGHBOR: name := grpcReq.Data.(string) info := server.routingPolicy.DefinedSets.NeighborSetList @@ -483,6 +1186,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { grpcReq.ResponseCh <- result } close(grpcReq.ResponseCh) + case REQ_POLICY_NEIGHBOR_ADD: reqNeighborSet := grpcReq.Data.(*api.NeighborSet) conNeighborSetList := server.routingPolicy.DefinedSets.NeighborSetList @@ -677,4 +1381,5 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) } + return msgs } |