diff options
Diffstat (limited to 'pkg/server/server.go')
-rw-r--r-- | pkg/server/server.go | 300 |
1 files changed, 251 insertions, 49 deletions
diff --git a/pkg/server/server.go b/pkg/server/server.go index b52ad284..78f0c254 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -195,7 +195,10 @@ func (server *BgpServer) Serve() { }).Warnf("Can't find the neighbor %s", e.MsgSrc) return } - if e.Version != peer.fsm.version { + peer.fsm.lock.RLock() + versionMismatch := e.Version != peer.fsm.version + peer.fsm.lock.RUnlock() + if versionMismatch { log.WithFields(log.Fields{ "Topic": "Peer", }).Debug("FSM version inconsistent") @@ -211,15 +214,23 @@ func (server *BgpServer) Serve() { remoteAddr := ipaddr.String() peer, found := server.neighborMap[remoteAddr] if found { - if peer.fsm.adminState != ADMIN_STATE_UP { + peer.fsm.lock.RLock() + adminStateNotUp := peer.fsm.adminState != ADMIN_STATE_UP + peer.fsm.lock.RUnlock() + if adminStateNotUp { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Remote Addr": remoteAddr, "Admin State": peer.fsm.adminState, }).Debug("New connection for non admin-state-up peer") + peer.fsm.lock.RUnlock() conn.Close() return } + peer.fsm.lock.RLock() + localAddr := peer.fsm.pConf.Transport.Config.LocalAddress + peer.fsm.lock.RUnlock() localAddrValid := func(laddr string) bool { if laddr == "0.0.0.0" || laddr == "::" { return true @@ -241,7 +252,7 @@ func (server *BgpServer) Serve() { return false } return true - }(peer.fsm.pConf.Transport.Config.LocalAddress) + }(localAddr) if !localAddrValid { conn.Close() @@ -269,7 +280,10 @@ func (server *BgpServer) Serve() { conn.Close() return } - server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): peer.fsm.pConf.ApplyPolicy}) + peer.fsm.lock.RLock() + policy := peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() + server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) server.neighborMap[remoteAddr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) @@ -357,12 +371,19 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { if path == nil { return nil } - if _, ok := peer.fsm.rfMap[path.GetRouteFamily()]; !ok { + + peer.fsm.lock.RLock() + _, ok := peer.fsm.rfMap[path.GetRouteFamily()] + peer.fsm.lock.RUnlock() + if !ok { return nil } //RFC4684 Constrained Route Distribution - if _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]; y && path.GetRouteFamily() != bgp.RF_RTC_UC { + peer.fsm.lock.RLock() + _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] + peer.fsm.lock.RUnlock() + if y && path.GetRouteFamily() != bgp.RF_RTC_UC { ignore := true for _, ext := range path.GetExtCommunities() { for _, p := range peer.adjRibIn.PathList([]bgp.RouteFamily{bgp.RF_RTC_UC}, true) { @@ -400,7 +421,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { // RFC4456 8. Avoiding Routing Information Loops // A router that recognizes the ORIGINATOR_ID attribute SHOULD // ignore a route received with its BGP Identifier as the ORIGINATOR_ID. - if id := path.GetOriginatorID(); peer.fsm.gConf.Config.RouterId == id.String() { + peer.fsm.lock.RLock() + routerId := peer.fsm.gConf.Config.RouterId + peer.fsm.lock.RUnlock() + if id := path.GetOriginatorID(); routerId == id.String() { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -417,7 +441,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { // If the local CLUSTER_ID is found in the CLUSTER_LIST, // the advertisement received SHOULD be ignored. for _, clusterID := range path.GetClusterList() { - if clusterID.Equal(peer.fsm.peerInfo.RouteReflectorClusterID) { + peer.fsm.lock.RLock() + rrClusterID := peer.fsm.peerInfo.RouteReflectorClusterID + peer.fsm.lock.RUnlock() + if clusterID.Equal(rrClusterID) { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -472,11 +499,13 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { // assumes "path" was already sent before. This assumption avoids the // infinite UPDATE loop between Route Reflector and its clients. if path.IsLocal() && path.Equal(old) { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, "Path": path, }).Debug("given rtm nlri is already sent, skipping to advertise") + peer.fsm.lock.RUnlock() return nil } @@ -512,11 +541,14 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { // only allow vpnv4 and vpnv6 paths to be advertised to VRFed neighbors. // also check we can import this path using table.CanImportToVrf() // if we can, make it local path by calling (*Path).ToLocal() - if path != nil && peer.fsm.pConf.Config.Vrf != "" { + peer.fsm.lock.RLock() + peerVrf := peer.fsm.pConf.Config.Vrf + peer.fsm.lock.RUnlock() + if path != nil && peerVrf != "" { if f := path.GetRouteFamily(); f != bgp.RF_IPv4_VPN && f != bgp.RF_IPv6_VPN { return nil } - vrf := peer.localRib.Vrfs[peer.fsm.pConf.Config.Vrf] + vrf := peer.localRib.Vrfs[peerVrf] if table.CanImportToVrf(vrf, path) { path = path.ToLocal() } else { @@ -525,14 +557,17 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { } // replace-peer-as handling + peer.fsm.lock.RLock() if path != nil && !path.IsWithdraw && peer.fsm.pConf.AsPathOptions.State.ReplacePeerAs { path = path.ReplaceAS(peer.fsm.pConf.Config.LocalAs, peer.fsm.pConf.Config.PeerAs) } + peer.fsm.lock.RUnlock() if path = filterpath(peer, path, old); path == nil { return nil } + peer.fsm.lock.RLock() options := &table.PolicyOptions{ Info: peer.fsm.peerInfo, OldNextHop: path.GetNexthop(), @@ -542,6 +577,7 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { if v := s.roaManager.validate(path); v != nil { options.ValidationResult = v } + peer.fsm.lock.RUnlock() path = peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, path, options) // When 'path' is filtered (path == nil), check 'old' has been sent to this peer. @@ -616,10 +652,14 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { // create copy which can be access to without mutex + peer.fsm.lock.RLock() conf := *peer.fsm.pConf + peerAfiSafis := peer.fsm.pConf.AfiSafis + peerCapMap := peer.fsm.capMap + peer.fsm.lock.RUnlock() - conf.AfiSafis = make([]config.AfiSafi, len(peer.fsm.pConf.AfiSafis)) - for i, af := range peer.fsm.pConf.AfiSafis { + conf.AfiSafis = make([]config.AfiSafi, len(peerAfiSafis)) + for i, af := range peerAfiSafis { conf.AfiSafis[i] = af conf.AfiSafis[i].AddPaths.State.Receive = peer.isAddPathReceiveEnabled(af.State.Family) if peer.isAddPathSendEnabled(af.State.Family) { @@ -629,8 +669,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { } } - remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peer.fsm.capMap)) - for _, caps := range peer.fsm.capMap { + remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peerCapMap)) + for _, caps := range peerCapMap { for _, m := range caps { // need to copy all values here buf, _ := m.Serialize() @@ -638,6 +678,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { remoteCap = append(remoteCap, c) } } + + peer.fsm.lock.RLock() conf.State.RemoteCapabilityList = remoteCap conf.State.LocalCapabilityList = capabilitiesFromConfig(peer.fsm.pConf) @@ -663,6 +705,7 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { conf.State.ReceivedOpenMessage, _ = bgp.ParseBGPMessage(buf) conf.State.RemoteRouterId = peer.fsm.peerInfo.ID.To4().String() } + peer.fsm.lock.RUnlock() return &conf } @@ -670,10 +713,12 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil { return } + cloned := clonePathList(pathList) if len(cloned) == 0 { return } + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventUpdate{ @@ -690,6 +735,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta PathList: cloned, Neighbor: server.ToConfig(peer, false), } + peer.fsm.lock.RUnlock() server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev) } @@ -697,10 +743,12 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil { return } + cloned := clonePathList(pathList) if len(cloned) == 0 { return } + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventUpdate{ @@ -715,6 +763,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t PathList: cloned, Neighbor: server.ToConfig(peer, false), } + peer.fsm.lock.RUnlock() server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev) } @@ -722,6 +771,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { _, rport := peer.fsm.RemoteHostPort() laddr, lport := peer.fsm.LocalHostPort() sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + peer.fsm.lock.RLock() recvOpen := peer.fsm.recvOpen e := &WatchEventPeerState{ PeerAS: peer.fsm.peerInfo.AS, @@ -738,6 +788,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { Timestamp: time.Now(), PeerInterface: peer.fsm.pConf.Config.NeighborInterface, } + peer.fsm.lock.RUnlock() if m != nil { e.StateReason = m.StateReason @@ -746,7 +797,9 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e *FsmMsg) { + peer.fsm.lock.RLock() newState := peer.fsm.state + peer.fsm.lock.RUnlock() if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e)) } @@ -754,6 +807,7 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, msg *bgp.BGPMessage, isSent bool) { // validation should be done in the caller of this function + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventMessage{ @@ -767,6 +821,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, m Timestamp: timestamp, IsSent: isSent, } + peer.fsm.lock.RUnlock() if !isSent { server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev) } @@ -806,14 +861,20 @@ func (s *BgpServer) getBestFromLocal(peer *Peer, rfList []bgp.RouteFamily) ([]*t } func (s *BgpServer) processOutgoingPaths(peer *Peer, paths, olds []*table.Path) []*table.Path { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if notEstablished { return nil } - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + if localRestarting { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debug("now syncing, suppress sending updates") + peer.fsm.lock.RUnlock() return nil } @@ -835,7 +896,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { m := e.MsgData.(*bgp.BGPMessage) rr := m.Body.(*bgp.BGPRouteRefresh) rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI) - if _, ok := peer.fsm.rfMap[rf]; !ok { + + peer.fsm.lock.RLock() + _, ok := peer.fsm.rfMap[rf] + peer.fsm.lock.RUnlock() + if !ok { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -843,7 +908,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { }).Warn("Route family isn't supported") return nil } - if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok { + + peer.fsm.lock.RLock() + _, ok = peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH] + peer.fsm.lock.RUnlock() + if !ok { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -861,7 +930,12 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { rs := peer != nil && peer.isRouteServerClient() - vrf := !rs && peer != nil && peer.fsm.pConf.Config.Vrf != "" + vrf := false + if peer != nil { + peer.fsm.lock.RLock() + vrf = !rs && peer.fsm.pConf.Config.Vrf != "" + peer.fsm.lock.RUnlock() + } tableId := table.GLOBAL_RIB_NAME rib := server.globalRib @@ -872,13 +946,18 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { for _, path := range pathList { if vrf { - path = path.ToGlobal(rib.Vrfs[peer.fsm.pConf.Config.Vrf]) + peer.fsm.lock.RLock() + peerVrf := peer.fsm.pConf.Config.Vrf + peer.fsm.lock.RUnlock() + path = path.ToGlobal(rib.Vrfs[peerVrf]) } policyOptions := &table.PolicyOptions{} if !rs && peer != nil { + peer.fsm.lock.RLock() policyOptions.Info = peer.fsm.peerInfo + peer.fsm.lock.RUnlock() } if v := server.roaManager.validate(path); v != nil { policyOptions.ValidationResult = v @@ -949,12 +1028,16 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { } func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) { + peer.fsm.lock.RLock() + peerInfo := peer.fsm.peerInfo + peer.fsm.lock.RUnlock() + rib := server.globalRib if peer.isRouteServerClient() { rib = server.rsRib } for _, family := range peer.toGlobalFamilies(families) { - for _, path := range rib.GetPathListByPeer(peer.fsm.peerInfo, family) { + for _, path := range rib.GetPathListByPeer(peerInfo, family) { p := path.Clone(true) if dsts := rib.Update(p); len(dsts) > 0 { server.propagateUpdateToNeighbors(peer, p, dsts, false) @@ -995,7 +1078,10 @@ func (server *BgpServer) propagateUpdateToNeighbors(source *Peer, newPath *table continue } f := func() bgp.RouteFamily { - if targetPeer.fsm.pConf.Config.Vrf != "" { + targetPeer.fsm.lock.RLock() + peerVrf := targetPeer.fsm.pConf.Config.Vrf + targetPeer.fsm.lock.RUnlock() + if peerVrf != "" { switch family { case bgp.RF_IPv4_VPN: return bgp.RF_IPv4_UC @@ -1047,19 +1133,31 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) + peer.fsm.lock.Lock() oldState := bgp.FSMState(peer.fsm.pConf.State.SessionState.ToInt()) peer.fsm.pConf.State.SessionState = config.IntToSessionStateMap[int(nextState)] + peer.fsm.lock.Unlock() + peer.fsm.StateChange(nextState) + peer.fsm.lock.RLock() + nextStateIdle := peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE + peer.fsm.lock.RUnlock() + // PeerDown if oldState == bgp.BGP_FSM_ESTABLISHED { t := time.Now() + peer.fsm.lock.Lock() if t.Sub(time.Unix(peer.fsm.pConf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD { peer.fsm.pConf.State.Flops++ } + graceful := peer.fsm.reason.Type == FSM_GRACEFUL_RESTART + peer.fsm.lock.Unlock() var drop []bgp.RouteFamily - if peer.fsm.reason.Type == FSM_GRACEFUL_RESTART { + if graceful { + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.PeerRestarting = true + peer.fsm.lock.Unlock() var p []bgp.RouteFamily p, drop = peer.forwardingPreservedFamilies() server.propagateUpdate(peer, peer.StaleAll(p)) @@ -1069,19 +1167,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool) peer.DropAll(drop) server.dropPeerAllRoutes(peer, drop) + + peer.fsm.lock.Lock() if peer.fsm.pConf.Config.PeerAs == 0 { peer.fsm.pConf.State.PeerAs = 0 peer.fsm.peerInfo.AS = 0 } + peer.fsm.lock.Unlock() + if peer.isDynamicNeighbor() { peer.stopPeerRestarting() go peer.stopFSM() + peer.fsm.lock.RLock() delete(server.neighborMap, peer.fsm.pConf.State.NeighborAddress) + peer.fsm.lock.RUnlock() server.broadcastPeerState(peer, oldState, e) return } - } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE { - if peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled { + } else if nextStateIdle { + peer.fsm.lock.RLock() + longLivedEnabled := peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled + peer.fsm.lock.RUnlock() + if longLivedEnabled { llgr, no_llgr := peer.llgrFamilies() peer.DropAll(no_llgr) @@ -1141,7 +1248,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // If the session does not get re-established within the "Restart Time" // that the peer advertised previously, the Receiving Speaker MUST // delete all the stale routes from the peer that it is retaining. + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false + peer.fsm.lock.Unlock() peer.DropAll(peer.configuredRFlist()) server.dropPeerAllRoutes(peer, peer.configuredRFlist()) } @@ -1153,19 +1262,25 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // update for export policy laddr, _ := peer.fsm.LocalHostPort() // may include zone info + peer.fsm.lock.Lock() peer.fsm.pConf.Transport.State.LocalAddress = laddr // exclude zone info ipaddr, _ := net.ResolveIPAddr("ip", laddr) peer.fsm.peerInfo.LocalAddress = ipaddr.IP + neighborAddress := peer.fsm.pConf.State.NeighborAddress + peer.fsm.lock.Unlock() deferralExpiredFunc := func(family bgp.RouteFamily) func() { return func() { server.mgmtOperation(func() error { - server.softResetOut(peer.fsm.pConf.State.NeighborAddress, family, true) + server.softResetOut(neighborAddress, family, true) return nil }, false) } } - if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + peer.fsm.lock.RLock() + notLocalRestarting := !peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if notLocalRestarting { // When graceful-restart cap (which means intention // of sending EOR) and route-target address family are negotiated, // send route-target NLRIs first, and wait to send others @@ -1176,8 +1291,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // waiting sending non-route-target NLRIs since the peer won't send // any routes (and EORs) before we send ours (or deferral-timer expires). var pathList []*table.Path + peer.fsm.lock.RLock() _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] - if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !peer.fsm.pConf.GracefulRestart.State.PeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 { + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + notPeerRestarting := !peer.fsm.pConf.GracefulRestart.State.PeerRestarting + peer.fsm.lock.RUnlock() + if y && notPeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 { pathList, _ = server.getBestFromLocal(peer, []bgp.RouteFamily{bgp.RF_RTC_UC}) t := c.RouteTargetMembership.Config.DeferralTime for _, f := range peer.negotiatedRFList() { @@ -1211,7 +1330,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { }() if allEnd { for _, p := range server.neighborMap { + p.fsm.lock.Lock() p.fsm.pConf.GracefulRestart.State.LocalRestarting = false + p.fsm.lock.Unlock() if !p.isGracefulRestartEnabled() { continue } @@ -1224,7 +1345,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { "Topic": "Server", }).Info("sync finished") } else { + peer.fsm.lock.RLock() deferral := peer.fsm.pConf.GracefulRestart.Config.DeferralTime + peer.fsm.lock.RUnlock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -1236,7 +1359,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { if server.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE { die := true for _, p := range server.neighborMap { - if p.fsm.state != bgp.BGP_FSM_IDLE { + p.fsm.lock.RLock() + stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE + p.fsm.lock.RUnlock() + if stateNotIdle { die = false break } @@ -1245,19 +1371,30 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { server.shutdownWG.Done() } } + peer.fsm.lock.Lock() peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix() + peer.fsm.lock.Unlock() } // clear counter - if peer.fsm.adminState == ADMIN_STATE_DOWN { + peer.fsm.lock.RLock() + adminStateDown := peer.fsm.adminState == ADMIN_STATE_DOWN + peer.fsm.lock.RUnlock() + if adminStateDown { + peer.fsm.lock.Lock() peer.fsm.pConf.State = config.NeighborState{} peer.fsm.pConf.State.NeighborAddress = peer.fsm.pConf.Config.NeighborAddress peer.fsm.pConf.State.PeerAs = peer.fsm.pConf.Config.PeerAs peer.fsm.pConf.Timers.State = config.TimersState{} + peer.fsm.lock.Unlock() } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, oldState, e) case FSM_MSG_ROUTE_REFRESH: - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime + peer.fsm.lock.RUnlock() + if notEstablished || beforeUptime { return } if paths := server.handleRouteRefresh(peer, e); len(paths) > 0 { @@ -1271,7 +1408,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { return case *bgp.BGPMessage: server.notifyRecvMessageWatcher(peer, e.timestamp, m) - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime + peer.fsm.lock.RUnlock() + if notEstablished || beforeUptime { return } pathList, eor, notification := peer.handleUpdate(e) @@ -1287,15 +1428,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { server.propagateUpdate(peer, pathList) } + peer.fsm.lock.RLock() + peerAfiSafis := peer.fsm.pConf.AfiSafis + peer.fsm.lock.RUnlock() if len(eor) > 0 { rtc := false for _, f := range eor { if f == bgp.RF_RTC_UC { rtc = true } - for i, a := range peer.fsm.pConf.AfiSafis { + for i, a := range peerAfiSafis { if a.State.Family == f { + peer.fsm.lock.Lock() peer.fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true + peer.fsm.lock.Unlock() } } } @@ -1307,7 +1453,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // End-of-RIB marker from all its peers (excluding the ones with the // "Restart State" bit set in the received capability and excluding the // ones that do not advertise the graceful restart capability) or ...snip... - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + + peer.fsm.lock.RLock() + localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if localRestarting { allEnd := func() bool { for _, p := range server.neighborMap { if !p.recvedAllEOR() { @@ -1318,7 +1468,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { }() if allEnd { for _, p := range server.neighborMap { + p.fsm.lock.Lock() p.fsm.pConf.GracefulRestart.State.LocalRestarting = false + p.fsm.lock.Unlock() if !p.isGracefulRestartEnabled() { continue } @@ -1336,14 +1488,19 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // we don't delay non-route-target NLRIs when local-restarting rtc = false } - if peer.fsm.pConf.GracefulRestart.State.PeerRestarting { + peer.fsm.lock.RLock() + peerRestarting := peer.fsm.pConf.GracefulRestart.State.PeerRestarting + peer.fsm.lock.RUnlock() + if peerRestarting { if peer.recvedAllEOR() { peer.stopPeerRestarting() pathList := peer.adjRibIn.DropStale(peer.configuredRFlist()) + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debugf("withdraw %d stale routes", len(pathList)) + peer.fsm.lock.RUnlock() server.propagateUpdate(peer, pathList) } @@ -1353,7 +1510,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // received EOR of route-target address family // outbound filter is now ready, let's flash non-route-target NLRIs - if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 { + peer.fsm.lock.RLock() + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + peer.fsm.lock.RUnlock() + if rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -1442,11 +1602,13 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error { ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1) ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy for _, peer := range s.neighborMap { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Info("call set policy") ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() } return s.policy.Reset(&policy, ap) }, false) @@ -1731,7 +1893,10 @@ func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInte func (s *BgpServer) DeleteVrf(name string) error { return s.mgmtOperation(func() error { for _, n := range s.neighborMap { - if n.fsm.pConf.Config.Vrf == name { + n.fsm.lock.RLock() + peerVrf := n.fsm.pConf.Config.Vrf + n.fsm.lock.RUnlock() + if peerVrf == name { return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID()) } } @@ -1792,7 +1957,11 @@ func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error { // route should be excluded from the Phase 2 decision function. isLooped := false if aspath := path.GetAsPath(); aspath != nil { - isLooped = hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath) + peer.fsm.lock.RLock() + localAS := peer.fsm.peerInfo.LocalAS + allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs) + peer.fsm.lock.RUnlock() + isLooped = hasOwnASLoop(localAS, allowOwnAS, aspath) } if path.IsAsLooped() != isLooped { // can't modify the existing one. needs to create one @@ -1816,21 +1985,30 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b return err } for _, peer := range peers { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } families := familiesForSoftreset(peer, family) if deferral { + peer.fsm.lock.RLock() _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + restarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if restarting { + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.LocalRestarting = false log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), "Families": families, }).Debug("deferral timer expired") - } else if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !c.MpGracefulRestart.State.EndOfRibReceived { + peer.fsm.lock.Unlock() + } else if y && !c.MpGracefulRestart.State.EndOfRibReceived { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -2038,7 +2216,10 @@ func (s *BgpServer) GetNeighbor(address string, getAdvertised bool) (l []*config s.mgmtOperation(func() error { l = make([]*config.Neighbor, 0, len(s.neighborMap)) for k, peer := range s.neighborMap { - if address != "" && address != k && address != peer.fsm.pConf.Config.NeighborInterface { + peer.fsm.lock.RLock() + neighborIface := peer.fsm.pConf.Config.NeighborInterface + peer.fsm.lock.RUnlock() + if address != "" && address != k && address != neighborIface { continue } l = append(l, s.ToConfig(peer, getAdvertised)) @@ -2222,7 +2403,10 @@ func (s *BgpServer) DeletePeerGroup(c *config.PeerGroup) error { return s.mgmtOperation(func() error { name := c.Config.PeerGroupName for _, n := range s.neighborMap { - if n.fsm.pConf.Config.PeerGroup == name { + n.fsm.lock.RLock() + peerGroup := n.fsm.pConf.Config.PeerGroup + n.fsm.lock.RUnlock() + if peerGroup == name { return fmt.Errorf("failed to delete peer-group %s: neighbor %s is in use", name, n.ID()) } } @@ -2411,7 +2595,9 @@ func (s *BgpServer) ResetNeighbor(addr, communication string) error { } peers, _ := s.addrToPeers(addr) for _, peer := range peers { + peer.fsm.lock.Lock() peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset + peer.fsm.lock.Unlock() } return nil }, true) @@ -2426,12 +2612,16 @@ func (s *BgpServer) setAdminState(addr, communication string, enable bool) error f := func(stateOp *AdminStateOperation, message string) { select { case peer.fsm.adminStateCh <- *stateOp: + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debug(message) + peer.fsm.lock.RUnlock() default: + peer.fsm.lock.RLock() log.Warning("previous request is still remaining. : ", peer.fsm.pConf.State.NeighborAddress) + peer.fsm.lock.RUnlock() } } if enable { @@ -2942,7 +3132,10 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } if w.opts.initPeerState { for _, peer := range s.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } w.notify(newWatchEventPeerState(peer, nil)) @@ -2956,14 +3149,18 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } if w.opts.initUpdate { for _, peer := range s.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } configNeighbor := w.s.ToConfig(peer, false) for _, rf := range peer.configuredRFlist() { + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - w.notify(&WatchEventUpdate{ + update := &WatchEventUpdate{ PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, PeerAddress: peer.fsm.peerInfo.Address, @@ -2974,11 +3171,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { PostPolicy: false, Neighbor: configNeighbor, PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false), - }) + } + peer.fsm.lock.RUnlock() + w.notify(update) eor := bgp.NewEndOfRib(rf) eorBuf, _ := eor.Serialize() - w.notify(&WatchEventUpdate{ + peer.fsm.lock.RLock() + update = &WatchEventUpdate{ Message: eor, PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, @@ -2991,7 +3191,9 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { Payload: eorBuf, PostPolicy: false, Neighbor: configNeighbor, - }) + } + peer.fsm.lock.RUnlock() + w.notify(update) } } } |