summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/server.go')
-rw-r--r--pkg/server/server.go300
1 files changed, 251 insertions, 49 deletions
diff --git a/pkg/server/server.go b/pkg/server/server.go
index b52ad284..78f0c254 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -195,7 +195,10 @@ func (server *BgpServer) Serve() {
}).Warnf("Can't find the neighbor %s", e.MsgSrc)
return
}
- if e.Version != peer.fsm.version {
+ peer.fsm.lock.RLock()
+ versionMismatch := e.Version != peer.fsm.version
+ peer.fsm.lock.RUnlock()
+ if versionMismatch {
log.WithFields(log.Fields{
"Topic": "Peer",
}).Debug("FSM version inconsistent")
@@ -211,15 +214,23 @@ func (server *BgpServer) Serve() {
remoteAddr := ipaddr.String()
peer, found := server.neighborMap[remoteAddr]
if found {
- if peer.fsm.adminState != ADMIN_STATE_UP {
+ peer.fsm.lock.RLock()
+ adminStateNotUp := peer.fsm.adminState != ADMIN_STATE_UP
+ peer.fsm.lock.RUnlock()
+ if adminStateNotUp {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Remote Addr": remoteAddr,
"Admin State": peer.fsm.adminState,
}).Debug("New connection for non admin-state-up peer")
+ peer.fsm.lock.RUnlock()
conn.Close()
return
}
+ peer.fsm.lock.RLock()
+ localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
+ peer.fsm.lock.RUnlock()
localAddrValid := func(laddr string) bool {
if laddr == "0.0.0.0" || laddr == "::" {
return true
@@ -241,7 +252,7 @@ func (server *BgpServer) Serve() {
return false
}
return true
- }(peer.fsm.pConf.Transport.Config.LocalAddress)
+ }(localAddr)
if !localAddrValid {
conn.Close()
@@ -269,7 +280,10 @@ func (server *BgpServer) Serve() {
conn.Close()
return
}
- server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): peer.fsm.pConf.ApplyPolicy})
+ peer.fsm.lock.RLock()
+ policy := peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
+ server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
server.neighborMap[remoteAddr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
@@ -357,12 +371,19 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
if path == nil {
return nil
}
- if _, ok := peer.fsm.rfMap[path.GetRouteFamily()]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok := peer.fsm.rfMap[path.GetRouteFamily()]
+ peer.fsm.lock.RUnlock()
+ if !ok {
return nil
}
//RFC4684 Constrained Route Distribution
- if _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]; y && path.GetRouteFamily() != bgp.RF_RTC_UC {
+ peer.fsm.lock.RLock()
+ _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
+ peer.fsm.lock.RUnlock()
+ if y && path.GetRouteFamily() != bgp.RF_RTC_UC {
ignore := true
for _, ext := range path.GetExtCommunities() {
for _, p := range peer.adjRibIn.PathList([]bgp.RouteFamily{bgp.RF_RTC_UC}, true) {
@@ -400,7 +421,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
// RFC4456 8. Avoiding Routing Information Loops
// A router that recognizes the ORIGINATOR_ID attribute SHOULD
// ignore a route received with its BGP Identifier as the ORIGINATOR_ID.
- if id := path.GetOriginatorID(); peer.fsm.gConf.Config.RouterId == id.String() {
+ peer.fsm.lock.RLock()
+ routerId := peer.fsm.gConf.Config.RouterId
+ peer.fsm.lock.RUnlock()
+ if id := path.GetOriginatorID(); routerId == id.String() {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -417,7 +441,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
// If the local CLUSTER_ID is found in the CLUSTER_LIST,
// the advertisement received SHOULD be ignored.
for _, clusterID := range path.GetClusterList() {
- if clusterID.Equal(peer.fsm.peerInfo.RouteReflectorClusterID) {
+ peer.fsm.lock.RLock()
+ rrClusterID := peer.fsm.peerInfo.RouteReflectorClusterID
+ peer.fsm.lock.RUnlock()
+ if clusterID.Equal(rrClusterID) {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -472,11 +499,13 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
// assumes "path" was already sent before. This assumption avoids the
// infinite UPDATE loop between Route Reflector and its clients.
if path.IsLocal() && path.Equal(old) {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
"Path": path,
}).Debug("given rtm nlri is already sent, skipping to advertise")
+ peer.fsm.lock.RUnlock()
return nil
}
@@ -512,11 +541,14 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
// only allow vpnv4 and vpnv6 paths to be advertised to VRFed neighbors.
// also check we can import this path using table.CanImportToVrf()
// if we can, make it local path by calling (*Path).ToLocal()
- if path != nil && peer.fsm.pConf.Config.Vrf != "" {
+ peer.fsm.lock.RLock()
+ peerVrf := peer.fsm.pConf.Config.Vrf
+ peer.fsm.lock.RUnlock()
+ if path != nil && peerVrf != "" {
if f := path.GetRouteFamily(); f != bgp.RF_IPv4_VPN && f != bgp.RF_IPv6_VPN {
return nil
}
- vrf := peer.localRib.Vrfs[peer.fsm.pConf.Config.Vrf]
+ vrf := peer.localRib.Vrfs[peerVrf]
if table.CanImportToVrf(vrf, path) {
path = path.ToLocal()
} else {
@@ -525,14 +557,17 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
}
// replace-peer-as handling
+ peer.fsm.lock.RLock()
if path != nil && !path.IsWithdraw && peer.fsm.pConf.AsPathOptions.State.ReplacePeerAs {
path = path.ReplaceAS(peer.fsm.pConf.Config.LocalAs, peer.fsm.pConf.Config.PeerAs)
}
+ peer.fsm.lock.RUnlock()
if path = filterpath(peer, path, old); path == nil {
return nil
}
+ peer.fsm.lock.RLock()
options := &table.PolicyOptions{
Info: peer.fsm.peerInfo,
OldNextHop: path.GetNexthop(),
@@ -542,6 +577,7 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
if v := s.roaManager.validate(path); v != nil {
options.ValidationResult = v
}
+ peer.fsm.lock.RUnlock()
path = peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, path, options)
// When 'path' is filtered (path == nil), check 'old' has been sent to this peer.
@@ -616,10 +652,14 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta
func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
// create copy which can be access to without mutex
+ peer.fsm.lock.RLock()
conf := *peer.fsm.pConf
+ peerAfiSafis := peer.fsm.pConf.AfiSafis
+ peerCapMap := peer.fsm.capMap
+ peer.fsm.lock.RUnlock()
- conf.AfiSafis = make([]config.AfiSafi, len(peer.fsm.pConf.AfiSafis))
- for i, af := range peer.fsm.pConf.AfiSafis {
+ conf.AfiSafis = make([]config.AfiSafi, len(peerAfiSafis))
+ for i, af := range peerAfiSafis {
conf.AfiSafis[i] = af
conf.AfiSafis[i].AddPaths.State.Receive = peer.isAddPathReceiveEnabled(af.State.Family)
if peer.isAddPathSendEnabled(af.State.Family) {
@@ -629,8 +669,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
}
}
- remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peer.fsm.capMap))
- for _, caps := range peer.fsm.capMap {
+ remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peerCapMap))
+ for _, caps := range peerCapMap {
for _, m := range caps {
// need to copy all values here
buf, _ := m.Serialize()
@@ -638,6 +678,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
remoteCap = append(remoteCap, c)
}
}
+
+ peer.fsm.lock.RLock()
conf.State.RemoteCapabilityList = remoteCap
conf.State.LocalCapabilityList = capabilitiesFromConfig(peer.fsm.pConf)
@@ -663,6 +705,7 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
conf.State.ReceivedOpenMessage, _ = bgp.ParseBGPMessage(buf)
conf.State.RemoteRouterId = peer.fsm.peerInfo.ID.To4().String()
}
+ peer.fsm.lock.RUnlock()
return &conf
}
@@ -670,10 +713,12 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta
if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil {
return
}
+
cloned := clonePathList(pathList)
if len(cloned) == 0 {
return
}
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventUpdate{
@@ -690,6 +735,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta
PathList: cloned,
Neighbor: server.ToConfig(peer, false),
}
+ peer.fsm.lock.RUnlock()
server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev)
}
@@ -697,10 +743,12 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t
if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil {
return
}
+
cloned := clonePathList(pathList)
if len(cloned) == 0 {
return
}
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventUpdate{
@@ -715,6 +763,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t
PathList: cloned,
Neighbor: server.ToConfig(peer, false),
}
+ peer.fsm.lock.RUnlock()
server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev)
}
@@ -722,6 +771,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
_, rport := peer.fsm.RemoteHostPort()
laddr, lport := peer.fsm.LocalHostPort()
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ peer.fsm.lock.RLock()
recvOpen := peer.fsm.recvOpen
e := &WatchEventPeerState{
PeerAS: peer.fsm.peerInfo.AS,
@@ -738,6 +788,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
Timestamp: time.Now(),
PeerInterface: peer.fsm.pConf.Config.NeighborInterface,
}
+ peer.fsm.lock.RUnlock()
if m != nil {
e.StateReason = m.StateReason
@@ -746,7 +797,9 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e *FsmMsg) {
+ peer.fsm.lock.RLock()
newState := peer.fsm.state
+ peer.fsm.lock.RUnlock()
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e))
}
@@ -754,6 +807,7 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e
func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, msg *bgp.BGPMessage, isSent bool) {
// validation should be done in the caller of this function
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventMessage{
@@ -767,6 +821,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, m
Timestamp: timestamp,
IsSent: isSent,
}
+ peer.fsm.lock.RUnlock()
if !isSent {
server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev)
}
@@ -806,14 +861,20 @@ func (s *BgpServer) getBestFromLocal(peer *Peer, rfList []bgp.RouteFamily) ([]*t
}
func (s *BgpServer) processOutgoingPaths(peer *Peer, paths, olds []*table.Path) []*table.Path {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
return nil
}
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ if localRestarting {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debug("now syncing, suppress sending updates")
+ peer.fsm.lock.RUnlock()
return nil
}
@@ -835,7 +896,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
m := e.MsgData.(*bgp.BGPMessage)
rr := m.Body.(*bgp.BGPRouteRefresh)
rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI)
- if _, ok := peer.fsm.rfMap[rf]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok := peer.fsm.rfMap[rf]
+ peer.fsm.lock.RUnlock()
+ if !ok {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -843,7 +908,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
}).Warn("Route family isn't supported")
return nil
}
- if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok = peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]
+ peer.fsm.lock.RUnlock()
+ if !ok {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -861,7 +930,12 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
rs := peer != nil && peer.isRouteServerClient()
- vrf := !rs && peer != nil && peer.fsm.pConf.Config.Vrf != ""
+ vrf := false
+ if peer != nil {
+ peer.fsm.lock.RLock()
+ vrf = !rs && peer.fsm.pConf.Config.Vrf != ""
+ peer.fsm.lock.RUnlock()
+ }
tableId := table.GLOBAL_RIB_NAME
rib := server.globalRib
@@ -872,13 +946,18 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
for _, path := range pathList {
if vrf {
- path = path.ToGlobal(rib.Vrfs[peer.fsm.pConf.Config.Vrf])
+ peer.fsm.lock.RLock()
+ peerVrf := peer.fsm.pConf.Config.Vrf
+ peer.fsm.lock.RUnlock()
+ path = path.ToGlobal(rib.Vrfs[peerVrf])
}
policyOptions := &table.PolicyOptions{}
if !rs && peer != nil {
+ peer.fsm.lock.RLock()
policyOptions.Info = peer.fsm.peerInfo
+ peer.fsm.lock.RUnlock()
}
if v := server.roaManager.validate(path); v != nil {
policyOptions.ValidationResult = v
@@ -949,12 +1028,16 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
}
func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) {
+ peer.fsm.lock.RLock()
+ peerInfo := peer.fsm.peerInfo
+ peer.fsm.lock.RUnlock()
+
rib := server.globalRib
if peer.isRouteServerClient() {
rib = server.rsRib
}
for _, family := range peer.toGlobalFamilies(families) {
- for _, path := range rib.GetPathListByPeer(peer.fsm.peerInfo, family) {
+ for _, path := range rib.GetPathListByPeer(peerInfo, family) {
p := path.Clone(true)
if dsts := rib.Update(p); len(dsts) > 0 {
server.propagateUpdateToNeighbors(peer, p, dsts, false)
@@ -995,7 +1078,10 @@ func (server *BgpServer) propagateUpdateToNeighbors(source *Peer, newPath *table
continue
}
f := func() bgp.RouteFamily {
- if targetPeer.fsm.pConf.Config.Vrf != "" {
+ targetPeer.fsm.lock.RLock()
+ peerVrf := targetPeer.fsm.pConf.Config.Vrf
+ targetPeer.fsm.lock.RUnlock()
+ if peerVrf != "" {
switch family {
case bgp.RF_IPv4_VPN:
return bgp.RF_IPv4_UC
@@ -1047,19 +1133,31 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
+ peer.fsm.lock.Lock()
oldState := bgp.FSMState(peer.fsm.pConf.State.SessionState.ToInt())
peer.fsm.pConf.State.SessionState = config.IntToSessionStateMap[int(nextState)]
+ peer.fsm.lock.Unlock()
+
peer.fsm.StateChange(nextState)
+ peer.fsm.lock.RLock()
+ nextStateIdle := peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE
+ peer.fsm.lock.RUnlock()
+
// PeerDown
if oldState == bgp.BGP_FSM_ESTABLISHED {
t := time.Now()
+ peer.fsm.lock.Lock()
if t.Sub(time.Unix(peer.fsm.pConf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD {
peer.fsm.pConf.State.Flops++
}
+ graceful := peer.fsm.reason.Type == FSM_GRACEFUL_RESTART
+ peer.fsm.lock.Unlock()
var drop []bgp.RouteFamily
- if peer.fsm.reason.Type == FSM_GRACEFUL_RESTART {
+ if graceful {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = true
+ peer.fsm.lock.Unlock()
var p []bgp.RouteFamily
p, drop = peer.forwardingPreservedFamilies()
server.propagateUpdate(peer, peer.StaleAll(p))
@@ -1069,19 +1167,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool)
peer.DropAll(drop)
server.dropPeerAllRoutes(peer, drop)
+
+ peer.fsm.lock.Lock()
if peer.fsm.pConf.Config.PeerAs == 0 {
peer.fsm.pConf.State.PeerAs = 0
peer.fsm.peerInfo.AS = 0
}
+ peer.fsm.lock.Unlock()
+
if peer.isDynamicNeighbor() {
peer.stopPeerRestarting()
go peer.stopFSM()
+ peer.fsm.lock.RLock()
delete(server.neighborMap, peer.fsm.pConf.State.NeighborAddress)
+ peer.fsm.lock.RUnlock()
server.broadcastPeerState(peer, oldState, e)
return
}
- } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE {
- if peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled {
+ } else if nextStateIdle {
+ peer.fsm.lock.RLock()
+ longLivedEnabled := peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled
+ peer.fsm.lock.RUnlock()
+ if longLivedEnabled {
llgr, no_llgr := peer.llgrFamilies()
peer.DropAll(no_llgr)
@@ -1141,7 +1248,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// If the session does not get re-established within the "Restart Time"
// that the peer advertised previously, the Receiving Speaker MUST
// delete all the stale routes from the peer that it is retaining.
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
+ peer.fsm.lock.Unlock()
peer.DropAll(peer.configuredRFlist())
server.dropPeerAllRoutes(peer, peer.configuredRFlist())
}
@@ -1153,19 +1262,25 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
// may include zone info
+ peer.fsm.lock.Lock()
peer.fsm.pConf.Transport.State.LocalAddress = laddr
// exclude zone info
ipaddr, _ := net.ResolveIPAddr("ip", laddr)
peer.fsm.peerInfo.LocalAddress = ipaddr.IP
+ neighborAddress := peer.fsm.pConf.State.NeighborAddress
+ peer.fsm.lock.Unlock()
deferralExpiredFunc := func(family bgp.RouteFamily) func() {
return func() {
server.mgmtOperation(func() error {
- server.softResetOut(peer.fsm.pConf.State.NeighborAddress, family, true)
+ server.softResetOut(neighborAddress, family, true)
return nil
}, false)
}
}
- if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ peer.fsm.lock.RLock()
+ notLocalRestarting := !peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if notLocalRestarting {
// When graceful-restart cap (which means intention
// of sending EOR) and route-target address family are negotiated,
// send route-target NLRIs first, and wait to send others
@@ -1176,8 +1291,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// waiting sending non-route-target NLRIs since the peer won't send
// any routes (and EORs) before we send ours (or deferral-timer expires).
var pathList []*table.Path
+ peer.fsm.lock.RLock()
_, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
- if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !peer.fsm.pConf.GracefulRestart.State.PeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 {
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ notPeerRestarting := !peer.fsm.pConf.GracefulRestart.State.PeerRestarting
+ peer.fsm.lock.RUnlock()
+ if y && notPeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 {
pathList, _ = server.getBestFromLocal(peer, []bgp.RouteFamily{bgp.RF_RTC_UC})
t := c.RouteTargetMembership.Config.DeferralTime
for _, f := range peer.negotiatedRFList() {
@@ -1211,7 +1330,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
}()
if allEnd {
for _, p := range server.neighborMap {
+ p.fsm.lock.Lock()
p.fsm.pConf.GracefulRestart.State.LocalRestarting = false
+ p.fsm.lock.Unlock()
if !p.isGracefulRestartEnabled() {
continue
}
@@ -1224,7 +1345,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
"Topic": "Server",
}).Info("sync finished")
} else {
+ peer.fsm.lock.RLock()
deferral := peer.fsm.pConf.GracefulRestart.Config.DeferralTime
+ peer.fsm.lock.RUnlock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -1236,7 +1359,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
if server.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE {
die := true
for _, p := range server.neighborMap {
- if p.fsm.state != bgp.BGP_FSM_IDLE {
+ p.fsm.lock.RLock()
+ stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE
+ p.fsm.lock.RUnlock()
+ if stateNotIdle {
die = false
break
}
@@ -1245,19 +1371,30 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
server.shutdownWG.Done()
}
}
+ peer.fsm.lock.Lock()
peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix()
+ peer.fsm.lock.Unlock()
}
// clear counter
- if peer.fsm.adminState == ADMIN_STATE_DOWN {
+ peer.fsm.lock.RLock()
+ adminStateDown := peer.fsm.adminState == ADMIN_STATE_DOWN
+ peer.fsm.lock.RUnlock()
+ if adminStateDown {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.State = config.NeighborState{}
peer.fsm.pConf.State.NeighborAddress = peer.fsm.pConf.Config.NeighborAddress
peer.fsm.pConf.State.PeerAs = peer.fsm.pConf.Config.PeerAs
peer.fsm.pConf.Timers.State = config.TimersState{}
+ peer.fsm.lock.Unlock()
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, oldState, e)
case FSM_MSG_ROUTE_REFRESH:
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime
+ peer.fsm.lock.RUnlock()
+ if notEstablished || beforeUptime {
return
}
if paths := server.handleRouteRefresh(peer, e); len(paths) > 0 {
@@ -1271,7 +1408,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
return
case *bgp.BGPMessage:
server.notifyRecvMessageWatcher(peer, e.timestamp, m)
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime
+ peer.fsm.lock.RUnlock()
+ if notEstablished || beforeUptime {
return
}
pathList, eor, notification := peer.handleUpdate(e)
@@ -1287,15 +1428,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
server.propagateUpdate(peer, pathList)
}
+ peer.fsm.lock.RLock()
+ peerAfiSafis := peer.fsm.pConf.AfiSafis
+ peer.fsm.lock.RUnlock()
if len(eor) > 0 {
rtc := false
for _, f := range eor {
if f == bgp.RF_RTC_UC {
rtc = true
}
- for i, a := range peer.fsm.pConf.AfiSafis {
+ for i, a := range peerAfiSafis {
if a.State.Family == f {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true
+ peer.fsm.lock.Unlock()
}
}
}
@@ -1307,7 +1453,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// End-of-RIB marker from all its peers (excluding the ones with the
// "Restart State" bit set in the received capability and excluding the
// ones that do not advertise the graceful restart capability) or ...snip...
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+
+ peer.fsm.lock.RLock()
+ localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if localRestarting {
allEnd := func() bool {
for _, p := range server.neighborMap {
if !p.recvedAllEOR() {
@@ -1318,7 +1468,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
}()
if allEnd {
for _, p := range server.neighborMap {
+ p.fsm.lock.Lock()
p.fsm.pConf.GracefulRestart.State.LocalRestarting = false
+ p.fsm.lock.Unlock()
if !p.isGracefulRestartEnabled() {
continue
}
@@ -1336,14 +1488,19 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// we don't delay non-route-target NLRIs when local-restarting
rtc = false
}
- if peer.fsm.pConf.GracefulRestart.State.PeerRestarting {
+ peer.fsm.lock.RLock()
+ peerRestarting := peer.fsm.pConf.GracefulRestart.State.PeerRestarting
+ peer.fsm.lock.RUnlock()
+ if peerRestarting {
if peer.recvedAllEOR() {
peer.stopPeerRestarting()
pathList := peer.adjRibIn.DropStale(peer.configuredRFlist())
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debugf("withdraw %d stale routes", len(pathList))
+ peer.fsm.lock.RUnlock()
server.propagateUpdate(peer, pathList)
}
@@ -1353,7 +1510,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// received EOR of route-target address family
// outbound filter is now ready, let's flash non-route-target NLRIs
- if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 {
+ peer.fsm.lock.RLock()
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ peer.fsm.lock.RUnlock()
+ if rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -1442,11 +1602,13 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error {
ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1)
ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy
for _, peer := range s.neighborMap {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Info("call set policy")
ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
}
return s.policy.Reset(&policy, ap)
}, false)
@@ -1731,7 +1893,10 @@ func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInte
func (s *BgpServer) DeleteVrf(name string) error {
return s.mgmtOperation(func() error {
for _, n := range s.neighborMap {
- if n.fsm.pConf.Config.Vrf == name {
+ n.fsm.lock.RLock()
+ peerVrf := n.fsm.pConf.Config.Vrf
+ n.fsm.lock.RUnlock()
+ if peerVrf == name {
return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID())
}
}
@@ -1792,7 +1957,11 @@ func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error {
// route should be excluded from the Phase 2 decision function.
isLooped := false
if aspath := path.GetAsPath(); aspath != nil {
- isLooped = hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath)
+ peer.fsm.lock.RLock()
+ localAS := peer.fsm.peerInfo.LocalAS
+ allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs)
+ peer.fsm.lock.RUnlock()
+ isLooped = hasOwnASLoop(localAS, allowOwnAS, aspath)
}
if path.IsAsLooped() != isLooped {
// can't modify the existing one. needs to create one
@@ -1816,21 +1985,30 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b
return err
}
for _, peer := range peers {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
families := familiesForSoftreset(peer, family)
if deferral {
+ peer.fsm.lock.RLock()
_, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ restarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if restarting {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.LocalRestarting = false
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
"Families": families,
}).Debug("deferral timer expired")
- } else if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !c.MpGracefulRestart.State.EndOfRibReceived {
+ peer.fsm.lock.Unlock()
+ } else if y && !c.MpGracefulRestart.State.EndOfRibReceived {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -2038,7 +2216,10 @@ func (s *BgpServer) GetNeighbor(address string, getAdvertised bool) (l []*config
s.mgmtOperation(func() error {
l = make([]*config.Neighbor, 0, len(s.neighborMap))
for k, peer := range s.neighborMap {
- if address != "" && address != k && address != peer.fsm.pConf.Config.NeighborInterface {
+ peer.fsm.lock.RLock()
+ neighborIface := peer.fsm.pConf.Config.NeighborInterface
+ peer.fsm.lock.RUnlock()
+ if address != "" && address != k && address != neighborIface {
continue
}
l = append(l, s.ToConfig(peer, getAdvertised))
@@ -2222,7 +2403,10 @@ func (s *BgpServer) DeletePeerGroup(c *config.PeerGroup) error {
return s.mgmtOperation(func() error {
name := c.Config.PeerGroupName
for _, n := range s.neighborMap {
- if n.fsm.pConf.Config.PeerGroup == name {
+ n.fsm.lock.RLock()
+ peerGroup := n.fsm.pConf.Config.PeerGroup
+ n.fsm.lock.RUnlock()
+ if peerGroup == name {
return fmt.Errorf("failed to delete peer-group %s: neighbor %s is in use", name, n.ID())
}
}
@@ -2411,7 +2595,9 @@ func (s *BgpServer) ResetNeighbor(addr, communication string) error {
}
peers, _ := s.addrToPeers(addr)
for _, peer := range peers {
+ peer.fsm.lock.Lock()
peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset
+ peer.fsm.lock.Unlock()
}
return nil
}, true)
@@ -2426,12 +2612,16 @@ func (s *BgpServer) setAdminState(addr, communication string, enable bool) error
f := func(stateOp *AdminStateOperation, message string) {
select {
case peer.fsm.adminStateCh <- *stateOp:
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debug(message)
+ peer.fsm.lock.RUnlock()
default:
+ peer.fsm.lock.RLock()
log.Warning("previous request is still remaining. : ", peer.fsm.pConf.State.NeighborAddress)
+ peer.fsm.lock.RUnlock()
}
}
if enable {
@@ -2942,7 +3132,10 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
if w.opts.initPeerState {
for _, peer := range s.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
w.notify(newWatchEventPeerState(peer, nil))
@@ -2956,14 +3149,18 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
if w.opts.initUpdate {
for _, peer := range s.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
configNeighbor := w.s.ToConfig(peer, false)
for _, rf := range peer.configuredRFlist() {
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- w.notify(&WatchEventUpdate{
+ update := &WatchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
@@ -2974,11 +3171,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
PostPolicy: false,
Neighbor: configNeighbor,
PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false),
- })
+ }
+ peer.fsm.lock.RUnlock()
+ w.notify(update)
eor := bgp.NewEndOfRib(rf)
eorBuf, _ := eor.Serialize()
- w.notify(&WatchEventUpdate{
+ peer.fsm.lock.RLock()
+ update = &WatchEventUpdate{
Message: eor,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
@@ -2991,7 +3191,9 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
Payload: eorBuf,
PostPolicy: false,
Neighbor: configNeighbor,
- })
+ }
+ peer.fsm.lock.RUnlock()
+ w.notify(update)
}
}
}