diff options
author | jhserrano <jhserrano.github@gmail.com> | 2018-07-09 19:48:57 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-07-19 22:23:29 +0900 |
commit | fb999f325b7b160df371145eb6fb29fbb5c73f21 (patch) | |
tree | 40406cf792a56e41662b6d2e07dd8648064c5aa2 | |
parent | 695fb5298edf8f912a9d11922e96f23c6464ba58 (diff) |
fix races and enable race detector in unittest
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | pkg/server/fsm.go | 218 | ||||
-rw-r--r-- | pkg/server/peer.go | 61 | ||||
-rw-r--r-- | pkg/server/server.go | 300 |
4 files changed, 510 insertions, 71 deletions
diff --git a/.travis.yml b/.travis.yml index 428513d8..22027f86 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ _dep_ensure: &_dep_ensure _unittest: &_unittest <<: *_dep_ensure script: - - go test $(go list ./... | grep -v '/vendor/') -timeout 120s + - go test $([ $(go env GOARCH) == 'amd64' ] && echo '-race') $(go list ./... | grep -v '/vendor/') -timeout 120s - if [ "$(go env GOARCH)" = "amd64" ]; then go test -race github.com/osrg/gobgp/pkg/packet/bgp -run ^Test_RaceCondition$; else echo 'skip'; fi _build: &_build diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index d0cc4f86..073cd88f 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "strconv" + "sync" "time" "github.com/eapache/channels" @@ -193,6 +194,7 @@ type FSM struct { t tomb.Tomb gConf *config.Global pConf *config.Neighbor + lock sync.RWMutex state bgp.FSMState reason *FsmStateReason conn net.Conn @@ -215,6 +217,8 @@ type FSM struct { } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { + fsm.lock.Lock() + defer fsm.lock.Unlock() state := &fsm.pConf.State.Messages timer := &fsm.pConf.Timers if isIn { @@ -264,6 +268,8 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) { + fsm.lock.Lock() + defer fsm.lock.Unlock() stats := &fsm.pConf.State.Messages.Received switch statType { // TODO @@ -305,6 +311,9 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP } func (fsm *FSM) StateChange(nextState bgp.FSMState) { + fsm.lock.Lock() + defer fsm.lock.Unlock() + log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -367,7 +376,11 @@ func (fsm *FSM) LocalHostPort() (string, uint16) { } func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) { - if fsm.h != nil && fsm.h.conn != nil { + fsm.lock.RLock() + established := fsm.h != nil && fsm.h.conn != nil + fsm.lock.RUnlock() + + if established { m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) b, _ := m.Serialize() _, err := fsm.h.conn.Write(b) @@ -392,7 +405,9 @@ func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) ( } func (fsm *FSM) connectLoop() error { + fsm.lock.RLock() tick := int(fsm.pConf.Timers.Config.ConnectRetry) + fsm.lock.RUnlock() if tick < MIN_CONNECT_RETRY { tick = MIN_CONNECT_RETRY } @@ -403,6 +418,9 @@ func (fsm *FSM) connectLoop() error { timer.Stop() connect := func() { + fsm.lock.RLock() + defer fsm.lock.RUnlock() + addr := fsm.pConf.State.NeighborAddress port := int(bgp.BGP_PORT) if fsm.pConf.Transport.Config.RemotePort != 0 { @@ -460,13 +478,18 @@ func (fsm *FSM) connectLoop() error { for { select { case <-fsm.t.Dying(): + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Debug("stop connect loop") + fsm.lock.RUnlock() return nil case <-timer.C: - if fsm.state == bgp.BGP_FSM_ACTIVE { + fsm.lock.RLock() + ready := fsm.state == bgp.BGP_FSM_ACTIVE + fsm.lock.RUnlock() + if ready { go connect() } case <-fsm.getActiveCh: @@ -504,18 +527,27 @@ func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *F func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + fsm.lock.RLock() idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime)) + fsm.lock.RUnlock() + for { select { case <-h.t.Dying(): return -1, NewFsmStateReason(FSM_DYING, nil, nil) case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case conn, ok := <-fsm.connCh: @@ -523,20 +555,28 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() + case <-idleHoldTimer.C: + fsm.lock.RLock() + adminStateUp := fsm.adminState == ADMIN_STATE_UP + fsm.lock.RUnlock() - if fsm.adminState == ADMIN_STATE_UP { + if adminStateUp { + fsm.lock.Lock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "Duration": fsm.idleHoldTime, }).Debug("IdleHoldTimer expired") fsm.idleHoldTime = HOLDTIME_IDLE + fsm.lock.Unlock() return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil) } else { @@ -553,7 +593,9 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { case ADMIN_STATE_UP: // restart idle hold timer + fsm.lock.RLock() idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime)) + fsm.lock.RUnlock() } } } @@ -570,9 +612,13 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { if !ok { break } + fsm.lock.Lock() fsm.conn = conn + fsm.lock.Unlock() ttl := 0 ttlMin := 0 + + fsm.lock.RLock() if fsm.pConf.TtlSecurity.Config.Enabled { ttl = 255 ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin) @@ -605,16 +651,22 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { }).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err) } } + fsm.lock.RUnlock() // we don't implement delayed open timer so move to opensent right // away. return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil) case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case err := <-h.stateReasonCh: @@ -791,6 +843,9 @@ func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily { } func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string { + h.fsm.lock.Lock() + defer h.fsm.lock.Unlock() + n := bgp.AddressFamilyNameMap[rf] for i, a := range h.fsm.pConf.AfiSafis { @@ -817,36 +872,44 @@ func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError b handling = factor.ErrorHandling switch handling { case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD: + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("Some attributes were discarded") + h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW: m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate)) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("the received Update message was treated as withdraw") + h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_AFISAFI_DISABLE: rf := extractRouteFamily(factor.ErrorAttribute) if rf == nil { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), }).Warn("Error occurred during AFI/SAFI disabling") + h.fsm.lock.RUnlock() } else { n := h.afiSafiDisable(*rf) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warnf("Capability %s was disabled", n) + h.fsm.lock.RUnlock() } } } else { @@ -874,6 +937,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { err = hd.DecodeFromBytes(headerBuf) if err != nil { h.fsm.bgpMessageStateUpdate(0, true) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -886,6 +950,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { MsgData: err, Version: h.fsm.version, } + h.fsm.lock.RUnlock() return fmsg, err } @@ -896,10 +961,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { } now := time.Now() - useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw handling := bgp.ERROR_HANDLING_NONE - m, err := bgp.ParseBGPBody(hd, bodyBuf, h.fsm.marshallingOptions) + h.fsm.lock.RLock() + useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw + options := h.fsm.marshallingOptions + h.fsm.lock.RUnlock() + + m, err := bgp.ParseBGPBody(hd, bodyBuf, options) if err != nil { handling = h.handlingError(m, err, useRevisedError) h.fsm.bgpMessageStateUpdate(0, true) @@ -907,29 +976,38 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { h.fsm.bgpMessageStateUpdate(m.Header.Type, true) err = bgp.ValidateBGPMessage(m) } + h.fsm.lock.RLock() fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.State.NeighborAddress, timestamp: now, Version: h.fsm.version, } + h.fsm.lock.RUnlock() switch handling { case bgp.ERROR_HANDLING_AFISAFI_DISABLE: fmsg.MsgData = m return fmsg, nil case bgp.ERROR_HANDLING_SESSION_RESET: + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP message") + h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err default: fmsg.MsgData = m - if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + + h.fsm.lock.RLock() + establishedState := h.fsm.state == bgp.BGP_FSM_ESTABLISHED + h.fsm.lock.RUnlock() + + if establishedState { switch m.Header.Type { case bgp.BGP_MSG_ROUTE_REFRESH: fmsg.MsgType = FSM_MSG_ROUTE_REFRESH @@ -942,17 +1020,22 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { copy(fmsg.payload, headerBuf) copy(fmsg.payload[len(headerBuf):], bodyBuf) - ok, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, isEBGP, isConfed) + h.fsm.lock.RLock() + rfMap := h.fsm.rfMap + h.fsm.lock.RUnlock() + ok, err := bgp.ValidateUpdateMsg(body, rfMap, isEBGP, isConfed) if !ok { handling = h.handlingError(m, err, useRevisedError) } if handling == bgp.ERROR_HANDLING_SESSION_RESET { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP update message") + h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err } @@ -974,7 +1057,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { return fmsg, err } - fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp) + h.fsm.lock.RLock() + peerInfo := h.fsm.peerInfo + h.fsm.lock.RUnlock() + fmsg.PathList = table.ProcessMessage(m, peerInfo, fmsg.timestamp) fallthrough case bgp.BGP_MSG_KEEPALIVE: // if the length of h.holdTimerResetCh @@ -991,6 +1077,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -999,7 +1086,9 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { "Communicated-Reason": communication, "Data": rest, }).Warn("received notification") + h.fsm.lock.RUnlock() } else { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -1007,9 +1096,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("received notification") + h.fsm.lock.RUnlock() } - if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET { + h.fsm.lock.RLock() + s := h.fsm.pConf.GracefulRestart.State + hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET + h.fsm.lock.RUnlock() + if hardReset { sendToStateReasonCh(FSM_HARD_RESET, m) } else { sendToStateReasonCh(FSM_NOTIFICATION_RECV, m) @@ -1089,13 +1183,20 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode] func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + + fsm.lock.RLock() m := buildopen(fsm.gConf, fsm.pConf) + fsm.lock.RUnlock() + b, _ := m.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) h.msgCh = channels.NewInfiniteChannel() + + fsm.lock.RLock() h.conn = fsm.conn + fsm.lock.RUnlock() h.t.Go(h.recvMessage) @@ -1103,7 +1204,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { // sets its HoldTimer to a large value // A HoldTimer value of 4 minutes is suggested as a "large value" // for the HoldTimer + fsm.lock.RLock() holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) + fsm.lock.RUnlock() for { select { @@ -1115,18 +1218,25 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } @@ -1139,16 +1249,27 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_OPEN { + fsm.lock.Lock() fsm.recvOpen = m + fsm.lock.Unlock() + body := m.Body.(*bgp.BGPOpen) - peerAs, err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs) + + fsm.lock.RLock() + fsmPeerAS := fsm.pConf.Config.PeerAs + fsm.lock.RUnlock() + peerAs, err := bgp.ValidateOpenMsg(body, fsmPeerAS) if err != nil { m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) } // ASN negotiation was skipped - if fsm.pConf.Config.PeerAs == 0 { + fsm.lock.RLock() + asnNegotiationSkipped := fsm.pConf.Config.PeerAs == 0 + fsm.lock.RUnlock() + if asnNegotiationSkipped { + fsm.lock.Lock() typ := config.PEER_TYPE_EXTERNAL if fsm.peerInfo.LocalAS == peerAs { typ = config.PEER_TYPE_INTERNAL @@ -1159,9 +1280,13 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ) + fsm.lock.Unlock() } else { + fsm.lock.Lock() fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType + fsm.lock.Unlock() } + fsm.lock.Lock() fsm.pConf.State.PeerAs = peerAs fsm.peerInfo.AS = peerAs fsm.peerInfo.ID = body.ID @@ -1264,6 +1389,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { } } + fsm.lock.Unlock() msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.conn.Write(b) @@ -1313,6 +1439,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { } func keepaliveTicker(fsm *FSM) *time.Ticker { + fsm.lock.RLock() + defer fsm.lock.RUnlock() + negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime if negotiatedTime == 0 { return &time.Ticker{} @@ -1328,6 +1457,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) h.msgCh = channels.NewInfiniteChannel() + fsm.lock.RLock() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -1340,6 +1470,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { // sets the HoldTimer according to the negotiated value holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } + fsm.lock.RUnlock() for { select { @@ -1351,18 +1482,25 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } @@ -1429,6 +1567,7 @@ func (h *FSMHandler) sendMessageloop() error { fsm := h.fsm ticker := keepaliveTicker(fsm) send := func(m *bgp.BGPMessage) error { + fsm.lock.RLock() if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE { log.WithFields(log.Fields{ "Topic": "Peer", @@ -1440,29 +1579,37 @@ func (h *FSMHandler) sendMessageloop() error { table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) } b, err := m.Serialize(h.fsm.marshallingOptions) + fsm.lock.RUnlock() if err != nil { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to serialize") + fsm.lock.RUnlock() fsm.bgpMessageStateUpdate(0, false) return nil } - if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil { + fsm.lock.RLock() + err = conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))) + fsm.lock.RUnlock() + if err != nil { h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("failed to set write deadline") } _, err = conn.Write(b) if err != nil { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to send") + fsm.lock.RUnlock() h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("closed") @@ -1474,6 +1621,7 @@ func (h *FSMHandler) sendMessageloop() error { body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1483,7 +1631,9 @@ func (h *FSMHandler) sendMessageloop() error { "Communicated-Reason": communication, "Data": rest, }).Warn("sent notification") + fsm.lock.RUnlock() } else { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1492,12 +1642,14 @@ func (h *FSMHandler) sendMessageloop() error { "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("sent notification") + fsm.lock.RUnlock() } h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil) conn.Close() return fmt.Errorf("closed") case bgp.BGP_MSG_UPDATE: update := m.Body.(*bgp.BGPUpdate) + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1506,13 +1658,16 @@ func (h *FSMHandler) sendMessageloop() error { "withdrawals": update.WithdrawnRoutes, "attributes": update.PathAttributes, }).Debug("sent update") + fsm.lock.RUnlock() default: + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "data": m, }).Debug("sent") + fsm.lock.RUnlock() } return nil } @@ -1523,7 +1678,10 @@ func (h *FSMHandler) sendMessageloop() error { return nil case o := <-h.outgoing.Out(): m := o.(*FsmOutgoingMsg) - for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, h.fsm.marshallingOptions) { + h.fsm.lock.RLock() + options := h.fsm.marshallingOptions + h.fsm.lock.RUnlock() + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { if err := send(msg); err != nil { return nil } @@ -1560,7 +1718,9 @@ func (h *FSMHandler) recvMessageloop() error { func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + fsm.lock.Lock() h.conn = fsm.conn + fsm.lock.Unlock() h.t.Go(h.sendMessageloop) h.msgCh = h.incoming h.t.Go(h.recvMessageloop) @@ -1569,7 +1729,9 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { + fsm.lock.RLock() holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) + fsm.lock.RUnlock() } fsm.gracefulRestartTimer.Stop() @@ -1583,14 +1745,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case err := <-h.stateReasonCh: h.conn.Close() h.t.Kill(nil) + fsm.lock.RLock() if s := fsm.pConf.GracefulRestart.State; s.Enabled && (s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV || err.Type == FSM_READ_FAILED || @@ -1603,20 +1768,25 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { }).Info("peer graceful restart") fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second) } + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("hold timer expired") + fsm.lock.RUnlock() m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) case <-h.holdTimerResetCh: + fsm.lock.RLock() if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } + fsm.lock.RUnlock() case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { @@ -1633,12 +1803,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { func (h *FSMHandler) loop() error { fsm := h.fsm ch := make(chan bgp.FSMState) + fsm.lock.RLock() oldState := fsm.state + fsm.lock.RUnlock() var reason *FsmStateReason f := func() error { nextState := bgp.FSMState(-1) - switch fsm.state { + fsm.lock.RLock() + fsmState := fsm.state + fsm.lock.RUnlock() + switch fsmState { case bgp.BGP_FSM_IDLE: nextState, reason = h.idle() // case bgp.BGP_FSM_CONNECT: @@ -1661,6 +1836,7 @@ func (h *FSMHandler) loop() error { nextState := <-ch + fsm.lock.RLock() if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { log.WithFields(log.Fields{ "Topic": "Peer", @@ -1685,6 +1861,7 @@ func (h *FSMHandler) loop() error { "Reason": reason.String(), }).Info("Peer Down") } + fsm.lock.RUnlock() e := time.AfterFunc(time.Second*120, func() { log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState) @@ -1694,6 +1871,7 @@ func (h *FSMHandler) loop() error { // under zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { + fsm.lock.RLock() h.stateCh <- &FsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, MsgSrc: fsm.pConf.State.NeighborAddress, @@ -1701,11 +1879,15 @@ func (h *FSMHandler) loop() error { StateReason: reason, Version: h.fsm.version, } + fsm.lock.RUnlock() } return nil } func (h *FSMHandler) changeAdminState(s AdminState) error { + h.fsm.lock.Lock() + defer h.fsm.lock.Unlock() + fsm := h.fsm if fsm.adminState != s { log.WithFields(log.Fields{ diff --git a/pkg/server/peer.go b/pkg/server/peer.go index 1f3dd8d8..2f7d6cc2 100644 --- a/pkg/server/peer.go +++ b/pkg/server/peer.go @@ -88,7 +88,9 @@ func newDynamicPeer(g *config.Global, neighborAddress string, pg *config.PeerGro return nil } peer := NewPeer(g, &conf, loc, policy) + peer.fsm.lock.Lock() peer.fsm.state = bgp.BGP_FSM_ACTIVE + peer.fsm.lock.Unlock() return peer } @@ -122,10 +124,14 @@ func NewPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, p } func (peer *Peer) AS() uint32 { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.State.PeerAs } func (peer *Peer) ID() string { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.State.NeighborAddress } @@ -138,18 +144,26 @@ func (peer *Peer) isIBGPPeer() bool { } func (peer *Peer) isRouteServerClient() bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.RouteServer.Config.RouteServerClient } func (peer *Peer) isRouteReflectorClient() bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.RouteReflector.Config.RouteReflectorClient } func (peer *Peer) isGracefulRestartEnabled() bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.GracefulRestart.State.Enabled } func (peer *Peer) getAddPathMode(family bgp.RouteFamily) bgp.BGPAddPathMode { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() if mode, y := peer.fsm.rfMap[family]; y { return mode } @@ -165,10 +179,14 @@ func (peer *Peer) isAddPathSendEnabled(family bgp.RouteFamily) bool { } func (peer *Peer) isDynamicNeighbor() bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() return peer.fsm.pConf.Config.NeighborAddress == "" && peer.fsm.pConf.Config.NeighborInterface == "" } func (peer *Peer) recvedAllEOR() bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() for _, a := range peer.fsm.pConf.AfiSafis { if s := a.MpGracefulRestart.State; s.Enabled && !s.EndOfRibReceived { return false @@ -178,11 +196,15 @@ func (peer *Peer) recvedAllEOR() bool { } func (peer *Peer) configuredRFlist() []bgp.RouteFamily { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() rfs, _ := config.AfiSafis(peer.fsm.pConf.AfiSafis).ToRfList() return rfs } func (peer *Peer) negotiatedRFList() []bgp.RouteFamily { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() l := make([]bgp.RouteFamily, 0, len(peer.fsm.rfMap)) for family, _ := range peer.fsm.rfMap { l = append(l, family) @@ -191,6 +213,8 @@ func (peer *Peer) negotiatedRFList() []bgp.RouteFamily { } func (peer *Peer) toGlobalFamilies(families []bgp.RouteFamily) []bgp.RouteFamily { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() if peer.fsm.pConf.Config.Vrf != "" { fs := make([]bgp.RouteFamily, 0, len(families)) for _, f := range families { @@ -233,26 +257,32 @@ func classifyFamilies(all, part []bgp.RouteFamily) ([]bgp.RouteFamily, []bgp.Rou } func (peer *Peer) forwardingPreservedFamilies() ([]bgp.RouteFamily, []bgp.RouteFamily) { + peer.fsm.lock.RLock() list := []bgp.RouteFamily{} for _, a := range peer.fsm.pConf.AfiSafis { if s := a.MpGracefulRestart.State; s.Enabled && s.Received { list = append(list, a.State.Family) } } + peer.fsm.lock.RUnlock() return classifyFamilies(peer.configuredRFlist(), list) } func (peer *Peer) llgrFamilies() ([]bgp.RouteFamily, []bgp.RouteFamily) { + peer.fsm.lock.RLock() list := []bgp.RouteFamily{} for _, a := range peer.fsm.pConf.AfiSafis { if a.LongLivedGracefulRestart.State.Enabled { list = append(list, a.State.Family) } } + peer.fsm.lock.RUnlock() return classifyFamilies(peer.configuredRFlist(), list) } func (peer *Peer) isLLGREnabledFamily(family bgp.RouteFamily) bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() if !peer.fsm.pConf.GracefulRestart.Config.LongLivedEnabled { return false } @@ -266,6 +296,8 @@ func (peer *Peer) isLLGREnabledFamily(family bgp.RouteFamily) bool { } func (peer *Peer) llgrRestartTime(family bgp.RouteFamily) uint32 { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() for _, a := range peer.fsm.pConf.AfiSafis { if a.State.Family == family { return a.LongLivedGracefulRestart.State.PeerRestartTime @@ -275,6 +307,8 @@ func (peer *Peer) llgrRestartTime(family bgp.RouteFamily) uint32 { } func (peer *Peer) llgrRestartTimerExpired(family bgp.RouteFamily) bool { + peer.fsm.lock.RLock() + defer peer.fsm.lock.RUnlock() all := true for _, a := range peer.fsm.pConf.AfiSafis { if a.State.Family == family { @@ -309,6 +343,8 @@ func (peer *Peer) markLLGRStale(fs []bgp.RouteFamily) []*table.Path { } func (peer *Peer) stopPeerRestarting() { + peer.fsm.lock.Lock() + defer peer.fsm.lock.Unlock() peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false for _, ch := range peer.llgrEndChs { close(ch) @@ -378,7 +414,9 @@ func (peer *Peer) doPrefixLimit(k bgp.RouteFamily, c *config.PrefixLimitConfig) } func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error { + peer.fsm.lock.RLock() x := peer.fsm.pConf.AfiSafis + peer.fsm.lock.RUnlock() y := c if len(x) != len(y) { return fmt.Errorf("changing supported afi-safi is not allowed") @@ -406,7 +444,9 @@ func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error { } } } + peer.fsm.lock.Lock() peer.fsm.pConf.AfiSafis = c + peer.fsm.lock.Unlock() return nil } @@ -420,7 +460,9 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg "withdrawals": update.WithdrawnRoutes, "attributes": update.PathAttributes, }).Debug("received update") + peer.fsm.lock.Lock() peer.fsm.pConf.Timers.State.UpdateRecvTime = time.Now().Unix() + peer.fsm.lock.Unlock() if len(e.PathList) > 0 { paths := make([]*table.Path, 0, len(e.PathList)) eor := []bgp.RouteFamily{} @@ -440,7 +482,11 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg // If the AS_PATH attribute of a BGP route contains an AS loop, the BGP // route should be excluded from the Phase 2 decision function. if aspath := path.GetAsPath(); aspath != nil { - if hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath) { + peer.fsm.lock.RLock() + localAS := peer.fsm.peerInfo.LocalAS + allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs) + peer.fsm.lock.RUnlock() + if hasOwnASLoop(localAS, allowOwnAS, aspath) { path.SetAsLooped(true) continue } @@ -448,7 +494,10 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg paths = append(paths, path) } peer.adjRibIn.Update(e.PathList) - for _, af := range peer.fsm.pConf.AfiSafis { + peer.fsm.lock.RLock() + peerAfiSafis := peer.fsm.pConf.AfiSafis + peer.fsm.lock.RUnlock() + for _, af := range peerAfiSafis { if msg := peer.doPrefixLimit(af.State.Family, &af.PrefixLimit.Config); msg != nil { return nil, nil, msg } @@ -459,7 +508,10 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg } func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) { - peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) + handler := NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) + peer.fsm.lock.Lock() + peer.fsm.h = handler + peer.fsm.lock.Unlock() } func (peer *Peer) StaleAll(rfList []bgp.RouteFamily) []*table.Path { @@ -484,13 +536,16 @@ func (peer *Peer) DropAll(rfList []bgp.RouteFamily) { func (peer *Peer) stopFSM() error { failed := false + peer.fsm.lock.RLock() addr := peer.fsm.pConf.State.NeighborAddress + peer.fsm.lock.RUnlock() t1 := time.AfterFunc(time.Minute*5, func() { log.WithFields(log.Fields{ "Topic": "Peer", }).Warnf("Failed to free the fsm.h.t for %s", addr) failed = true }) + peer.fsm.h.t.Kill(nil) peer.fsm.h.t.Wait() t1.Stop() diff --git a/pkg/server/server.go b/pkg/server/server.go index b52ad284..78f0c254 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -195,7 +195,10 @@ func (server *BgpServer) Serve() { }).Warnf("Can't find the neighbor %s", e.MsgSrc) return } - if e.Version != peer.fsm.version { + peer.fsm.lock.RLock() + versionMismatch := e.Version != peer.fsm.version + peer.fsm.lock.RUnlock() + if versionMismatch { log.WithFields(log.Fields{ "Topic": "Peer", }).Debug("FSM version inconsistent") @@ -211,15 +214,23 @@ func (server *BgpServer) Serve() { remoteAddr := ipaddr.String() peer, found := server.neighborMap[remoteAddr] if found { - if peer.fsm.adminState != ADMIN_STATE_UP { + peer.fsm.lock.RLock() + adminStateNotUp := peer.fsm.adminState != ADMIN_STATE_UP + peer.fsm.lock.RUnlock() + if adminStateNotUp { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Remote Addr": remoteAddr, "Admin State": peer.fsm.adminState, }).Debug("New connection for non admin-state-up peer") + peer.fsm.lock.RUnlock() conn.Close() return } + peer.fsm.lock.RLock() + localAddr := peer.fsm.pConf.Transport.Config.LocalAddress + peer.fsm.lock.RUnlock() localAddrValid := func(laddr string) bool { if laddr == "0.0.0.0" || laddr == "::" { return true @@ -241,7 +252,7 @@ func (server *BgpServer) Serve() { return false } return true - }(peer.fsm.pConf.Transport.Config.LocalAddress) + }(localAddr) if !localAddrValid { conn.Close() @@ -269,7 +280,10 @@ func (server *BgpServer) Serve() { conn.Close() return } - server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): peer.fsm.pConf.ApplyPolicy}) + peer.fsm.lock.RLock() + policy := peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() + server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) server.neighborMap[remoteAddr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) @@ -357,12 +371,19 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { if path == nil { return nil } - if _, ok := peer.fsm.rfMap[path.GetRouteFamily()]; !ok { + + peer.fsm.lock.RLock() + _, ok := peer.fsm.rfMap[path.GetRouteFamily()] + peer.fsm.lock.RUnlock() + if !ok { return nil } //RFC4684 Constrained Route Distribution - if _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]; y && path.GetRouteFamily() != bgp.RF_RTC_UC { + peer.fsm.lock.RLock() + _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] + peer.fsm.lock.RUnlock() + if y && path.GetRouteFamily() != bgp.RF_RTC_UC { ignore := true for _, ext := range path.GetExtCommunities() { for _, p := range peer.adjRibIn.PathList([]bgp.RouteFamily{bgp.RF_RTC_UC}, true) { @@ -400,7 +421,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { // RFC4456 8. Avoiding Routing Information Loops // A router that recognizes the ORIGINATOR_ID attribute SHOULD // ignore a route received with its BGP Identifier as the ORIGINATOR_ID. - if id := path.GetOriginatorID(); peer.fsm.gConf.Config.RouterId == id.String() { + peer.fsm.lock.RLock() + routerId := peer.fsm.gConf.Config.RouterId + peer.fsm.lock.RUnlock() + if id := path.GetOriginatorID(); routerId == id.String() { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -417,7 +441,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path { // If the local CLUSTER_ID is found in the CLUSTER_LIST, // the advertisement received SHOULD be ignored. for _, clusterID := range path.GetClusterList() { - if clusterID.Equal(peer.fsm.peerInfo.RouteReflectorClusterID) { + peer.fsm.lock.RLock() + rrClusterID := peer.fsm.peerInfo.RouteReflectorClusterID + peer.fsm.lock.RUnlock() + if clusterID.Equal(rrClusterID) { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -472,11 +499,13 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { // assumes "path" was already sent before. This assumption avoids the // infinite UPDATE loop between Route Reflector and its clients. if path.IsLocal() && path.Equal(old) { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, "Path": path, }).Debug("given rtm nlri is already sent, skipping to advertise") + peer.fsm.lock.RUnlock() return nil } @@ -512,11 +541,14 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { // only allow vpnv4 and vpnv6 paths to be advertised to VRFed neighbors. // also check we can import this path using table.CanImportToVrf() // if we can, make it local path by calling (*Path).ToLocal() - if path != nil && peer.fsm.pConf.Config.Vrf != "" { + peer.fsm.lock.RLock() + peerVrf := peer.fsm.pConf.Config.Vrf + peer.fsm.lock.RUnlock() + if path != nil && peerVrf != "" { if f := path.GetRouteFamily(); f != bgp.RF_IPv4_VPN && f != bgp.RF_IPv6_VPN { return nil } - vrf := peer.localRib.Vrfs[peer.fsm.pConf.Config.Vrf] + vrf := peer.localRib.Vrfs[peerVrf] if table.CanImportToVrf(vrf, path) { path = path.ToLocal() } else { @@ -525,14 +557,17 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { } // replace-peer-as handling + peer.fsm.lock.RLock() if path != nil && !path.IsWithdraw && peer.fsm.pConf.AsPathOptions.State.ReplacePeerAs { path = path.ReplaceAS(peer.fsm.pConf.Config.LocalAs, peer.fsm.pConf.Config.PeerAs) } + peer.fsm.lock.RUnlock() if path = filterpath(peer, path, old); path == nil { return nil } + peer.fsm.lock.RLock() options := &table.PolicyOptions{ Info: peer.fsm.peerInfo, OldNextHop: path.GetNexthop(), @@ -542,6 +577,7 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path { if v := s.roaManager.validate(path); v != nil { options.ValidationResult = v } + peer.fsm.lock.RUnlock() path = peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, path, options) // When 'path' is filtered (path == nil), check 'old' has been sent to this peer. @@ -616,10 +652,14 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { // create copy which can be access to without mutex + peer.fsm.lock.RLock() conf := *peer.fsm.pConf + peerAfiSafis := peer.fsm.pConf.AfiSafis + peerCapMap := peer.fsm.capMap + peer.fsm.lock.RUnlock() - conf.AfiSafis = make([]config.AfiSafi, len(peer.fsm.pConf.AfiSafis)) - for i, af := range peer.fsm.pConf.AfiSafis { + conf.AfiSafis = make([]config.AfiSafi, len(peerAfiSafis)) + for i, af := range peerAfiSafis { conf.AfiSafis[i] = af conf.AfiSafis[i].AddPaths.State.Receive = peer.isAddPathReceiveEnabled(af.State.Family) if peer.isAddPathSendEnabled(af.State.Family) { @@ -629,8 +669,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { } } - remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peer.fsm.capMap)) - for _, caps := range peer.fsm.capMap { + remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peerCapMap)) + for _, caps := range peerCapMap { for _, m := range caps { // need to copy all values here buf, _ := m.Serialize() @@ -638,6 +678,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { remoteCap = append(remoteCap, c) } } + + peer.fsm.lock.RLock() conf.State.RemoteCapabilityList = remoteCap conf.State.LocalCapabilityList = capabilitiesFromConfig(peer.fsm.pConf) @@ -663,6 +705,7 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor { conf.State.ReceivedOpenMessage, _ = bgp.ParseBGPMessage(buf) conf.State.RemoteRouterId = peer.fsm.peerInfo.ID.To4().String() } + peer.fsm.lock.RUnlock() return &conf } @@ -670,10 +713,12 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil { return } + cloned := clonePathList(pathList) if len(cloned) == 0 { return } + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventUpdate{ @@ -690,6 +735,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta PathList: cloned, Neighbor: server.ToConfig(peer, false), } + peer.fsm.lock.RUnlock() server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev) } @@ -697,10 +743,12 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil { return } + cloned := clonePathList(pathList) if len(cloned) == 0 { return } + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventUpdate{ @@ -715,6 +763,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t PathList: cloned, Neighbor: server.ToConfig(peer, false), } + peer.fsm.lock.RUnlock() server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev) } @@ -722,6 +771,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { _, rport := peer.fsm.RemoteHostPort() laddr, lport := peer.fsm.LocalHostPort() sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + peer.fsm.lock.RLock() recvOpen := peer.fsm.recvOpen e := &WatchEventPeerState{ PeerAS: peer.fsm.peerInfo.AS, @@ -738,6 +788,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { Timestamp: time.Now(), PeerInterface: peer.fsm.pConf.Config.NeighborInterface, } + peer.fsm.lock.RUnlock() if m != nil { e.StateReason = m.StateReason @@ -746,7 +797,9 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState { } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e *FsmMsg) { + peer.fsm.lock.RLock() newState := peer.fsm.state + peer.fsm.lock.RUnlock() if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e)) } @@ -754,6 +807,7 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, msg *bgp.BGPMessage, isSent bool) { // validation should be done in the caller of this function + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &WatchEventMessage{ @@ -767,6 +821,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, m Timestamp: timestamp, IsSent: isSent, } + peer.fsm.lock.RUnlock() if !isSent { server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev) } @@ -806,14 +861,20 @@ func (s *BgpServer) getBestFromLocal(peer *Peer, rfList []bgp.RouteFamily) ([]*t } func (s *BgpServer) processOutgoingPaths(peer *Peer, paths, olds []*table.Path) []*table.Path { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if notEstablished { return nil } - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + if localRestarting { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debug("now syncing, suppress sending updates") + peer.fsm.lock.RUnlock() return nil } @@ -835,7 +896,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { m := e.MsgData.(*bgp.BGPMessage) rr := m.Body.(*bgp.BGPRouteRefresh) rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI) - if _, ok := peer.fsm.rfMap[rf]; !ok { + + peer.fsm.lock.RLock() + _, ok := peer.fsm.rfMap[rf] + peer.fsm.lock.RUnlock() + if !ok { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -843,7 +908,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { }).Warn("Route family isn't supported") return nil } - if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok { + + peer.fsm.lock.RLock() + _, ok = peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH] + peer.fsm.lock.RUnlock() + if !ok { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -861,7 +930,12 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path { func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { rs := peer != nil && peer.isRouteServerClient() - vrf := !rs && peer != nil && peer.fsm.pConf.Config.Vrf != "" + vrf := false + if peer != nil { + peer.fsm.lock.RLock() + vrf = !rs && peer.fsm.pConf.Config.Vrf != "" + peer.fsm.lock.RUnlock() + } tableId := table.GLOBAL_RIB_NAME rib := server.globalRib @@ -872,13 +946,18 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { for _, path := range pathList { if vrf { - path = path.ToGlobal(rib.Vrfs[peer.fsm.pConf.Config.Vrf]) + peer.fsm.lock.RLock() + peerVrf := peer.fsm.pConf.Config.Vrf + peer.fsm.lock.RUnlock() + path = path.ToGlobal(rib.Vrfs[peerVrf]) } policyOptions := &table.PolicyOptions{} if !rs && peer != nil { + peer.fsm.lock.RLock() policyOptions.Info = peer.fsm.peerInfo + peer.fsm.lock.RUnlock() } if v := server.roaManager.validate(path); v != nil { policyOptions.ValidationResult = v @@ -949,12 +1028,16 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) { } func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) { + peer.fsm.lock.RLock() + peerInfo := peer.fsm.peerInfo + peer.fsm.lock.RUnlock() + rib := server.globalRib if peer.isRouteServerClient() { rib = server.rsRib } for _, family := range peer.toGlobalFamilies(families) { - for _, path := range rib.GetPathListByPeer(peer.fsm.peerInfo, family) { + for _, path := range rib.GetPathListByPeer(peerInfo, family) { p := path.Clone(true) if dsts := rib.Update(p); len(dsts) > 0 { server.propagateUpdateToNeighbors(peer, p, dsts, false) @@ -995,7 +1078,10 @@ func (server *BgpServer) propagateUpdateToNeighbors(source *Peer, newPath *table continue } f := func() bgp.RouteFamily { - if targetPeer.fsm.pConf.Config.Vrf != "" { + targetPeer.fsm.lock.RLock() + peerVrf := targetPeer.fsm.pConf.Config.Vrf + targetPeer.fsm.lock.RUnlock() + if peerVrf != "" { switch family { case bgp.RF_IPv4_VPN: return bgp.RF_IPv4_UC @@ -1047,19 +1133,31 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) + peer.fsm.lock.Lock() oldState := bgp.FSMState(peer.fsm.pConf.State.SessionState.ToInt()) peer.fsm.pConf.State.SessionState = config.IntToSessionStateMap[int(nextState)] + peer.fsm.lock.Unlock() + peer.fsm.StateChange(nextState) + peer.fsm.lock.RLock() + nextStateIdle := peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE + peer.fsm.lock.RUnlock() + // PeerDown if oldState == bgp.BGP_FSM_ESTABLISHED { t := time.Now() + peer.fsm.lock.Lock() if t.Sub(time.Unix(peer.fsm.pConf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD { peer.fsm.pConf.State.Flops++ } + graceful := peer.fsm.reason.Type == FSM_GRACEFUL_RESTART + peer.fsm.lock.Unlock() var drop []bgp.RouteFamily - if peer.fsm.reason.Type == FSM_GRACEFUL_RESTART { + if graceful { + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.PeerRestarting = true + peer.fsm.lock.Unlock() var p []bgp.RouteFamily p, drop = peer.forwardingPreservedFamilies() server.propagateUpdate(peer, peer.StaleAll(p)) @@ -1069,19 +1167,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool) peer.DropAll(drop) server.dropPeerAllRoutes(peer, drop) + + peer.fsm.lock.Lock() if peer.fsm.pConf.Config.PeerAs == 0 { peer.fsm.pConf.State.PeerAs = 0 peer.fsm.peerInfo.AS = 0 } + peer.fsm.lock.Unlock() + if peer.isDynamicNeighbor() { peer.stopPeerRestarting() go peer.stopFSM() + peer.fsm.lock.RLock() delete(server.neighborMap, peer.fsm.pConf.State.NeighborAddress) + peer.fsm.lock.RUnlock() server.broadcastPeerState(peer, oldState, e) return } - } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE { - if peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled { + } else if nextStateIdle { + peer.fsm.lock.RLock() + longLivedEnabled := peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled + peer.fsm.lock.RUnlock() + if longLivedEnabled { llgr, no_llgr := peer.llgrFamilies() peer.DropAll(no_llgr) @@ -1141,7 +1248,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // If the session does not get re-established within the "Restart Time" // that the peer advertised previously, the Receiving Speaker MUST // delete all the stale routes from the peer that it is retaining. + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false + peer.fsm.lock.Unlock() peer.DropAll(peer.configuredRFlist()) server.dropPeerAllRoutes(peer, peer.configuredRFlist()) } @@ -1153,19 +1262,25 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // update for export policy laddr, _ := peer.fsm.LocalHostPort() // may include zone info + peer.fsm.lock.Lock() peer.fsm.pConf.Transport.State.LocalAddress = laddr // exclude zone info ipaddr, _ := net.ResolveIPAddr("ip", laddr) peer.fsm.peerInfo.LocalAddress = ipaddr.IP + neighborAddress := peer.fsm.pConf.State.NeighborAddress + peer.fsm.lock.Unlock() deferralExpiredFunc := func(family bgp.RouteFamily) func() { return func() { server.mgmtOperation(func() error { - server.softResetOut(peer.fsm.pConf.State.NeighborAddress, family, true) + server.softResetOut(neighborAddress, family, true) return nil }, false) } } - if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + peer.fsm.lock.RLock() + notLocalRestarting := !peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if notLocalRestarting { // When graceful-restart cap (which means intention // of sending EOR) and route-target address family are negotiated, // send route-target NLRIs first, and wait to send others @@ -1176,8 +1291,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // waiting sending non-route-target NLRIs since the peer won't send // any routes (and EORs) before we send ours (or deferral-timer expires). var pathList []*table.Path + peer.fsm.lock.RLock() _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] - if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !peer.fsm.pConf.GracefulRestart.State.PeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 { + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + notPeerRestarting := !peer.fsm.pConf.GracefulRestart.State.PeerRestarting + peer.fsm.lock.RUnlock() + if y && notPeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 { pathList, _ = server.getBestFromLocal(peer, []bgp.RouteFamily{bgp.RF_RTC_UC}) t := c.RouteTargetMembership.Config.DeferralTime for _, f := range peer.negotiatedRFList() { @@ -1211,7 +1330,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { }() if allEnd { for _, p := range server.neighborMap { + p.fsm.lock.Lock() p.fsm.pConf.GracefulRestart.State.LocalRestarting = false + p.fsm.lock.Unlock() if !p.isGracefulRestartEnabled() { continue } @@ -1224,7 +1345,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { "Topic": "Server", }).Info("sync finished") } else { + peer.fsm.lock.RLock() deferral := peer.fsm.pConf.GracefulRestart.Config.DeferralTime + peer.fsm.lock.RUnlock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -1236,7 +1359,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { if server.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE { die := true for _, p := range server.neighborMap { - if p.fsm.state != bgp.BGP_FSM_IDLE { + p.fsm.lock.RLock() + stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE + p.fsm.lock.RUnlock() + if stateNotIdle { die = false break } @@ -1245,19 +1371,30 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { server.shutdownWG.Done() } } + peer.fsm.lock.Lock() peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix() + peer.fsm.lock.Unlock() } // clear counter - if peer.fsm.adminState == ADMIN_STATE_DOWN { + peer.fsm.lock.RLock() + adminStateDown := peer.fsm.adminState == ADMIN_STATE_DOWN + peer.fsm.lock.RUnlock() + if adminStateDown { + peer.fsm.lock.Lock() peer.fsm.pConf.State = config.NeighborState{} peer.fsm.pConf.State.NeighborAddress = peer.fsm.pConf.Config.NeighborAddress peer.fsm.pConf.State.PeerAs = peer.fsm.pConf.Config.PeerAs peer.fsm.pConf.Timers.State = config.TimersState{} + peer.fsm.lock.Unlock() } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, oldState, e) case FSM_MSG_ROUTE_REFRESH: - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime + peer.fsm.lock.RUnlock() + if notEstablished || beforeUptime { return } if paths := server.handleRouteRefresh(peer, e); len(paths) > 0 { @@ -1271,7 +1408,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { return case *bgp.BGPMessage: server.notifyRecvMessageWatcher(peer, e.timestamp, m) - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime + peer.fsm.lock.RUnlock() + if notEstablished || beforeUptime { return } pathList, eor, notification := peer.handleUpdate(e) @@ -1287,15 +1428,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { server.propagateUpdate(peer, pathList) } + peer.fsm.lock.RLock() + peerAfiSafis := peer.fsm.pConf.AfiSafis + peer.fsm.lock.RUnlock() if len(eor) > 0 { rtc := false for _, f := range eor { if f == bgp.RF_RTC_UC { rtc = true } - for i, a := range peer.fsm.pConf.AfiSafis { + for i, a := range peerAfiSafis { if a.State.Family == f { + peer.fsm.lock.Lock() peer.fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true + peer.fsm.lock.Unlock() } } } @@ -1307,7 +1453,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // End-of-RIB marker from all its peers (excluding the ones with the // "Restart State" bit set in the received capability and excluding the // ones that do not advertise the graceful restart capability) or ...snip... - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + + peer.fsm.lock.RLock() + localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if localRestarting { allEnd := func() bool { for _, p := range server.neighborMap { if !p.recvedAllEOR() { @@ -1318,7 +1468,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { }() if allEnd { for _, p := range server.neighborMap { + p.fsm.lock.Lock() p.fsm.pConf.GracefulRestart.State.LocalRestarting = false + p.fsm.lock.Unlock() if !p.isGracefulRestartEnabled() { continue } @@ -1336,14 +1488,19 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // we don't delay non-route-target NLRIs when local-restarting rtc = false } - if peer.fsm.pConf.GracefulRestart.State.PeerRestarting { + peer.fsm.lock.RLock() + peerRestarting := peer.fsm.pConf.GracefulRestart.State.PeerRestarting + peer.fsm.lock.RUnlock() + if peerRestarting { if peer.recvedAllEOR() { peer.stopPeerRestarting() pathList := peer.adjRibIn.DropStale(peer.configuredRFlist()) + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debugf("withdraw %d stale routes", len(pathList)) + peer.fsm.lock.RUnlock() server.propagateUpdate(peer, pathList) } @@ -1353,7 +1510,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { // received EOR of route-target address family // outbound filter is now ready, let's flash non-route-target NLRIs - if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 { + peer.fsm.lock.RLock() + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + peer.fsm.lock.RUnlock() + if rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -1442,11 +1602,13 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error { ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1) ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy for _, peer := range s.neighborMap { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Info("call set policy") ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() } return s.policy.Reset(&policy, ap) }, false) @@ -1731,7 +1893,10 @@ func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInte func (s *BgpServer) DeleteVrf(name string) error { return s.mgmtOperation(func() error { for _, n := range s.neighborMap { - if n.fsm.pConf.Config.Vrf == name { + n.fsm.lock.RLock() + peerVrf := n.fsm.pConf.Config.Vrf + n.fsm.lock.RUnlock() + if peerVrf == name { return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID()) } } @@ -1792,7 +1957,11 @@ func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error { // route should be excluded from the Phase 2 decision function. isLooped := false if aspath := path.GetAsPath(); aspath != nil { - isLooped = hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath) + peer.fsm.lock.RLock() + localAS := peer.fsm.peerInfo.LocalAS + allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs) + peer.fsm.lock.RUnlock() + isLooped = hasOwnASLoop(localAS, allowOwnAS, aspath) } if path.IsAsLooped() != isLooped { // can't modify the existing one. needs to create one @@ -1816,21 +1985,30 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b return err } for _, peer := range peers { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } families := familiesForSoftreset(peer, family) if deferral { + peer.fsm.lock.RLock() _, y := peer.fsm.rfMap[bgp.RF_RTC_UC] - if peer.fsm.pConf.GracefulRestart.State.LocalRestarting { + c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC) + restarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting + peer.fsm.lock.RUnlock() + if restarting { + peer.fsm.lock.Lock() peer.fsm.pConf.GracefulRestart.State.LocalRestarting = false log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), "Families": families, }).Debug("deferral timer expired") - } else if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !c.MpGracefulRestart.State.EndOfRibReceived { + peer.fsm.lock.Unlock() + } else if y && !c.MpGracefulRestart.State.EndOfRibReceived { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), @@ -2038,7 +2216,10 @@ func (s *BgpServer) GetNeighbor(address string, getAdvertised bool) (l []*config s.mgmtOperation(func() error { l = make([]*config.Neighbor, 0, len(s.neighborMap)) for k, peer := range s.neighborMap { - if address != "" && address != k && address != peer.fsm.pConf.Config.NeighborInterface { + peer.fsm.lock.RLock() + neighborIface := peer.fsm.pConf.Config.NeighborInterface + peer.fsm.lock.RUnlock() + if address != "" && address != k && address != neighborIface { continue } l = append(l, s.ToConfig(peer, getAdvertised)) @@ -2222,7 +2403,10 @@ func (s *BgpServer) DeletePeerGroup(c *config.PeerGroup) error { return s.mgmtOperation(func() error { name := c.Config.PeerGroupName for _, n := range s.neighborMap { - if n.fsm.pConf.Config.PeerGroup == name { + n.fsm.lock.RLock() + peerGroup := n.fsm.pConf.Config.PeerGroup + n.fsm.lock.RUnlock() + if peerGroup == name { return fmt.Errorf("failed to delete peer-group %s: neighbor %s is in use", name, n.ID()) } } @@ -2411,7 +2595,9 @@ func (s *BgpServer) ResetNeighbor(addr, communication string) error { } peers, _ := s.addrToPeers(addr) for _, peer := range peers { + peer.fsm.lock.Lock() peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset + peer.fsm.lock.Unlock() } return nil }, true) @@ -2426,12 +2612,16 @@ func (s *BgpServer) setAdminState(addr, communication string, enable bool) error f := func(stateOp *AdminStateOperation, message string) { select { case peer.fsm.adminStateCh <- *stateOp: + peer.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.fsm.pConf.State.NeighborAddress, }).Debug(message) + peer.fsm.lock.RUnlock() default: + peer.fsm.lock.RLock() log.Warning("previous request is still remaining. : ", peer.fsm.pConf.State.NeighborAddress) + peer.fsm.lock.RUnlock() } } if enable { @@ -2942,7 +3132,10 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } if w.opts.initPeerState { for _, peer := range s.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } w.notify(newWatchEventPeerState(peer, nil)) @@ -2956,14 +3149,18 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } if w.opts.initUpdate { for _, peer := range s.neighborMap { - if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + peer.fsm.lock.RLock() + notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED + peer.fsm.lock.RUnlock() + if notEstablished { continue } configNeighbor := w.s.ToConfig(peer, false) for _, rf := range peer.configuredRFlist() { + peer.fsm.lock.RLock() _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - w.notify(&WatchEventUpdate{ + update := &WatchEventUpdate{ PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, PeerAddress: peer.fsm.peerInfo.Address, @@ -2974,11 +3171,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { PostPolicy: false, Neighbor: configNeighbor, PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false), - }) + } + peer.fsm.lock.RUnlock() + w.notify(update) eor := bgp.NewEndOfRib(rf) eorBuf, _ := eor.Serialize() - w.notify(&WatchEventUpdate{ + peer.fsm.lock.RLock() + update = &WatchEventUpdate{ Message: eor, PeerAS: peer.fsm.peerInfo.AS, LocalAS: peer.fsm.peerInfo.LocalAS, @@ -2991,7 +3191,9 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { Payload: eorBuf, PostPolicy: false, Neighbor: configNeighbor, - }) + } + peer.fsm.lock.RUnlock() + w.notify(update) } } } |