summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorjhserrano <jhserrano.github@gmail.com>2018-07-09 19:48:57 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-07-19 22:23:29 +0900
commitfb999f325b7b160df371145eb6fb29fbb5c73f21 (patch)
tree40406cf792a56e41662b6d2e07dd8648064c5aa2
parent695fb5298edf8f912a9d11922e96f23c6464ba58 (diff)
fix races and enable race detector in unittest
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--.travis.yml2
-rw-r--r--pkg/server/fsm.go218
-rw-r--r--pkg/server/peer.go61
-rw-r--r--pkg/server/server.go300
4 files changed, 510 insertions, 71 deletions
diff --git a/.travis.yml b/.travis.yml
index 428513d8..22027f86 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,7 +8,7 @@ _dep_ensure: &_dep_ensure
_unittest: &_unittest
<<: *_dep_ensure
script:
- - go test $(go list ./... | grep -v '/vendor/') -timeout 120s
+ - go test $([ $(go env GOARCH) == 'amd64' ] && echo '-race') $(go list ./... | grep -v '/vendor/') -timeout 120s
- if [ "$(go env GOARCH)" = "amd64" ]; then go test -race github.com/osrg/gobgp/pkg/packet/bgp -run ^Test_RaceCondition$; else echo 'skip'; fi
_build: &_build
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go
index d0cc4f86..073cd88f 100644
--- a/pkg/server/fsm.go
+++ b/pkg/server/fsm.go
@@ -21,6 +21,7 @@ import (
"math/rand"
"net"
"strconv"
+ "sync"
"time"
"github.com/eapache/channels"
@@ -193,6 +194,7 @@ type FSM struct {
t tomb.Tomb
gConf *config.Global
pConf *config.Neighbor
+ lock sync.RWMutex
state bgp.FSMState
reason *FsmStateReason
conn net.Conn
@@ -215,6 +217,8 @@ type FSM struct {
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
state := &fsm.pConf.State.Messages
timer := &fsm.pConf.Timers
if isIn {
@@ -264,6 +268,8 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
stats := &fsm.pConf.State.Messages.Received
switch statType {
// TODO
@@ -305,6 +311,9 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
}
func (fsm *FSM) StateChange(nextState bgp.FSMState) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
+
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -367,7 +376,11 @@ func (fsm *FSM) LocalHostPort() (string, uint16) {
}
func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) {
- if fsm.h != nil && fsm.h.conn != nil {
+ fsm.lock.RLock()
+ established := fsm.h != nil && fsm.h.conn != nil
+ fsm.lock.RUnlock()
+
+ if established {
m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data)
b, _ := m.Serialize()
_, err := fsm.h.conn.Write(b)
@@ -392,7 +405,9 @@ func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) (
}
func (fsm *FSM) connectLoop() error {
+ fsm.lock.RLock()
tick := int(fsm.pConf.Timers.Config.ConnectRetry)
+ fsm.lock.RUnlock()
if tick < MIN_CONNECT_RETRY {
tick = MIN_CONNECT_RETRY
}
@@ -403,6 +418,9 @@ func (fsm *FSM) connectLoop() error {
timer.Stop()
connect := func() {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
addr := fsm.pConf.State.NeighborAddress
port := int(bgp.BGP_PORT)
if fsm.pConf.Transport.Config.RemotePort != 0 {
@@ -460,13 +478,18 @@ func (fsm *FSM) connectLoop() error {
for {
select {
case <-fsm.t.Dying():
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
}).Debug("stop connect loop")
+ fsm.lock.RUnlock()
return nil
case <-timer.C:
- if fsm.state == bgp.BGP_FSM_ACTIVE {
+ fsm.lock.RLock()
+ ready := fsm.state == bgp.BGP_FSM_ACTIVE
+ fsm.lock.RUnlock()
+ if ready {
go connect()
}
case <-fsm.getActiveCh:
@@ -504,18 +527,27 @@ func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *F
func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+ fsm.lock.RLock()
idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime))
+ fsm.lock.RUnlock()
+
for {
select {
case <-h.t.Dying():
return -1, NewFsmStateReason(FSM_DYING, nil, nil)
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
case conn, ok := <-fsm.connCh:
@@ -523,20 +555,28 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
+
case <-idleHoldTimer.C:
+ fsm.lock.RLock()
+ adminStateUp := fsm.adminState == ADMIN_STATE_UP
+ fsm.lock.RUnlock()
- if fsm.adminState == ADMIN_STATE_UP {
+ if adminStateUp {
+ fsm.lock.Lock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"Duration": fsm.idleHoldTime,
}).Debug("IdleHoldTimer expired")
fsm.idleHoldTime = HOLDTIME_IDLE
+ fsm.lock.Unlock()
return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil)
} else {
@@ -553,7 +593,9 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
case ADMIN_STATE_UP:
// restart idle hold timer
+ fsm.lock.RLock()
idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime))
+ fsm.lock.RUnlock()
}
}
}
@@ -570,9 +612,13 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) {
if !ok {
break
}
+ fsm.lock.Lock()
fsm.conn = conn
+ fsm.lock.Unlock()
ttl := 0
ttlMin := 0
+
+ fsm.lock.RLock()
if fsm.pConf.TtlSecurity.Config.Enabled {
ttl = 255
ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin)
@@ -605,16 +651,22 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) {
}).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err)
}
}
+ fsm.lock.RUnlock()
// we don't implement delayed open timer so move to opensent right
// away.
return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil)
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
case err := <-h.stateReasonCh:
@@ -791,6 +843,9 @@ func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily {
}
func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string {
+ h.fsm.lock.Lock()
+ defer h.fsm.lock.Unlock()
+
n := bgp.AddressFamilyNameMap[rf]
for i, a := range h.fsm.pConf.AfiSafis {
@@ -817,36 +872,44 @@ func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError b
handling = factor.ErrorHandling
switch handling {
case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD:
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warn("Some attributes were discarded")
+ h.fsm.lock.RUnlock()
case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW:
m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate))
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warn("the received Update message was treated as withdraw")
+ h.fsm.lock.RUnlock()
case bgp.ERROR_HANDLING_AFISAFI_DISABLE:
rf := extractRouteFamily(factor.ErrorAttribute)
if rf == nil {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
}).Warn("Error occurred during AFI/SAFI disabling")
+ h.fsm.lock.RUnlock()
} else {
n := h.afiSafiDisable(*rf)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warnf("Capability %s was disabled", n)
+ h.fsm.lock.RUnlock()
}
}
} else {
@@ -874,6 +937,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
err = hd.DecodeFromBytes(headerBuf)
if err != nil {
h.fsm.bgpMessageStateUpdate(0, true)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -886,6 +950,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
MsgData: err,
Version: h.fsm.version,
}
+ h.fsm.lock.RUnlock()
return fmsg, err
}
@@ -896,10 +961,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
}
now := time.Now()
- useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw
handling := bgp.ERROR_HANDLING_NONE
- m, err := bgp.ParseBGPBody(hd, bodyBuf, h.fsm.marshallingOptions)
+ h.fsm.lock.RLock()
+ useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+
+ m, err := bgp.ParseBGPBody(hd, bodyBuf, options)
if err != nil {
handling = h.handlingError(m, err, useRevisedError)
h.fsm.bgpMessageStateUpdate(0, true)
@@ -907,29 +976,38 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
h.fsm.bgpMessageStateUpdate(m.Header.Type, true)
err = bgp.ValidateBGPMessage(m)
}
+ h.fsm.lock.RLock()
fmsg := &FsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
MsgSrc: h.fsm.pConf.State.NeighborAddress,
timestamp: now,
Version: h.fsm.version,
}
+ h.fsm.lock.RUnlock()
switch handling {
case bgp.ERROR_HANDLING_AFISAFI_DISABLE:
fmsg.MsgData = m
return fmsg, nil
case bgp.ERROR_HANDLING_SESSION_RESET:
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": err,
}).Warn("Session will be reset due to malformed BGP message")
+ h.fsm.lock.RUnlock()
fmsg.MsgData = err
return fmsg, err
default:
fmsg.MsgData = m
- if h.fsm.state == bgp.BGP_FSM_ESTABLISHED {
+
+ h.fsm.lock.RLock()
+ establishedState := h.fsm.state == bgp.BGP_FSM_ESTABLISHED
+ h.fsm.lock.RUnlock()
+
+ if establishedState {
switch m.Header.Type {
case bgp.BGP_MSG_ROUTE_REFRESH:
fmsg.MsgType = FSM_MSG_ROUTE_REFRESH
@@ -942,17 +1020,22 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
copy(fmsg.payload, headerBuf)
copy(fmsg.payload[len(headerBuf):], bodyBuf)
- ok, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, isEBGP, isConfed)
+ h.fsm.lock.RLock()
+ rfMap := h.fsm.rfMap
+ h.fsm.lock.RUnlock()
+ ok, err := bgp.ValidateUpdateMsg(body, rfMap, isEBGP, isConfed)
if !ok {
handling = h.handlingError(m, err, useRevisedError)
}
if handling == bgp.ERROR_HANDLING_SESSION_RESET {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": err,
}).Warn("Session will be reset due to malformed BGP update message")
+ h.fsm.lock.RUnlock()
fmsg.MsgData = err
return fmsg, err
}
@@ -974,7 +1057,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
return fmsg, err
}
- fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp)
+ h.fsm.lock.RLock()
+ peerInfo := h.fsm.peerInfo
+ h.fsm.lock.RUnlock()
+ fmsg.PathList = table.ProcessMessage(m, peerInfo, fmsg.timestamp)
fallthrough
case bgp.BGP_MSG_KEEPALIVE:
// if the length of h.holdTimerResetCh
@@ -991,6 +1077,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
body := m.Body.(*bgp.BGPNotification)
if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) {
communication, rest := decodeAdministrativeCommunication(body.Data)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -999,7 +1086,9 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
"Communicated-Reason": communication,
"Data": rest,
}).Warn("received notification")
+ h.fsm.lock.RUnlock()
} else {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -1007,9 +1096,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
"Subcode": body.ErrorSubcode,
"Data": body.Data,
}).Warn("received notification")
+ h.fsm.lock.RUnlock()
}
- if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET {
+ h.fsm.lock.RLock()
+ s := h.fsm.pConf.GracefulRestart.State
+ hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET
+ h.fsm.lock.RUnlock()
+ if hardReset {
sendToStateReasonCh(FSM_HARD_RESET, m)
} else {
sendToStateReasonCh(FSM_NOTIFICATION_RECV, m)
@@ -1089,13 +1183,20 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode]
func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+
+ fsm.lock.RLock()
m := buildopen(fsm.gConf, fsm.pConf)
+ fsm.lock.RUnlock()
+
b, _ := m.Serialize()
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
h.msgCh = channels.NewInfiniteChannel()
+
+ fsm.lock.RLock()
h.conn = fsm.conn
+ fsm.lock.RUnlock()
h.t.Go(h.recvMessage)
@@ -1103,7 +1204,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
// sets its HoldTimer to a large value
// A HoldTimer value of 4 minutes is suggested as a "large value"
// for the HoldTimer
+ fsm.lock.RLock()
holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime))
+ fsm.lock.RUnlock()
for {
select {
@@ -1115,18 +1218,25 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
h.conn.Close()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
@@ -1139,16 +1249,27 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
if m.Header.Type == bgp.BGP_MSG_OPEN {
+ fsm.lock.Lock()
fsm.recvOpen = m
+ fsm.lock.Unlock()
+
body := m.Body.(*bgp.BGPOpen)
- peerAs, err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs)
+
+ fsm.lock.RLock()
+ fsmPeerAS := fsm.pConf.Config.PeerAs
+ fsm.lock.RUnlock()
+ peerAs, err := bgp.ValidateOpenMsg(body, fsmPeerAS)
if err != nil {
m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError))
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil)
}
// ASN negotiation was skipped
- if fsm.pConf.Config.PeerAs == 0 {
+ fsm.lock.RLock()
+ asnNegotiationSkipped := fsm.pConf.Config.PeerAs == 0
+ fsm.lock.RUnlock()
+ if asnNegotiationSkipped {
+ fsm.lock.Lock()
typ := config.PEER_TYPE_EXTERNAL
if fsm.peerInfo.LocalAS == peerAs {
typ = config.PEER_TYPE_INTERNAL
@@ -1159,9 +1280,13 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ)
+ fsm.lock.Unlock()
} else {
+ fsm.lock.Lock()
fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType
+ fsm.lock.Unlock()
}
+ fsm.lock.Lock()
fsm.pConf.State.PeerAs = peerAs
fsm.peerInfo.AS = peerAs
fsm.peerInfo.ID = body.ID
@@ -1264,6 +1389,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
}
}
+ fsm.lock.Unlock()
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
fsm.conn.Write(b)
@@ -1313,6 +1439,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
}
func keepaliveTicker(fsm *FSM) *time.Ticker {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime
if negotiatedTime == 0 {
return &time.Ticker{}
@@ -1328,6 +1457,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
h.msgCh = channels.NewInfiniteChannel()
+ fsm.lock.RLock()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -1340,6 +1470,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
// sets the HoldTimer according to the negotiated value
holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
}
+ fsm.lock.RUnlock()
for {
select {
@@ -1351,18 +1482,25 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
h.conn.Close()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
@@ -1429,6 +1567,7 @@ func (h *FSMHandler) sendMessageloop() error {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
send := func(m *bgp.BGPMessage) error {
+ fsm.lock.RLock()
if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1440,29 +1579,37 @@ func (h *FSMHandler) sendMessageloop() error {
table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate))
}
b, err := m.Serialize(h.fsm.marshallingOptions)
+ fsm.lock.RUnlock()
if err != nil {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"Data": err,
}).Warn("failed to serialize")
+ fsm.lock.RUnlock()
fsm.bgpMessageStateUpdate(0, false)
return nil
}
- if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil {
+ fsm.lock.RLock()
+ err = conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)))
+ fsm.lock.RUnlock()
+ if err != nil {
h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil)
conn.Close()
return fmt.Errorf("failed to set write deadline")
}
_, err = conn.Write(b)
if err != nil {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"Data": err,
}).Warn("failed to send")
+ fsm.lock.RUnlock()
h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil)
conn.Close()
return fmt.Errorf("closed")
@@ -1474,6 +1621,7 @@ func (h *FSMHandler) sendMessageloop() error {
body := m.Body.(*bgp.BGPNotification)
if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) {
communication, rest := decodeAdministrativeCommunication(body.Data)
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1483,7 +1631,9 @@ func (h *FSMHandler) sendMessageloop() error {
"Communicated-Reason": communication,
"Data": rest,
}).Warn("sent notification")
+ fsm.lock.RUnlock()
} else {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1492,12 +1642,14 @@ func (h *FSMHandler) sendMessageloop() error {
"Subcode": body.ErrorSubcode,
"Data": body.Data,
}).Warn("sent notification")
+ fsm.lock.RUnlock()
}
h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil)
conn.Close()
return fmt.Errorf("closed")
case bgp.BGP_MSG_UPDATE:
update := m.Body.(*bgp.BGPUpdate)
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1506,13 +1658,16 @@ func (h *FSMHandler) sendMessageloop() error {
"withdrawals": update.WithdrawnRoutes,
"attributes": update.PathAttributes,
}).Debug("sent update")
+ fsm.lock.RUnlock()
default:
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"data": m,
}).Debug("sent")
+ fsm.lock.RUnlock()
}
return nil
}
@@ -1523,7 +1678,10 @@ func (h *FSMHandler) sendMessageloop() error {
return nil
case o := <-h.outgoing.Out():
m := o.(*FsmOutgoingMsg)
- for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, h.fsm.marshallingOptions) {
+ h.fsm.lock.RLock()
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+ for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
if err := send(msg); err != nil {
return nil
}
@@ -1560,7 +1718,9 @@ func (h *FSMHandler) recvMessageloop() error {
func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+ fsm.lock.Lock()
h.conn = fsm.conn
+ fsm.lock.Unlock()
h.t.Go(h.sendMessageloop)
h.msgCh = h.incoming
h.t.Go(h.recvMessageloop)
@@ -1569,7 +1729,9 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
holdTimer = &time.Timer{}
} else {
+ fsm.lock.RLock()
holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
+ fsm.lock.RUnlock()
}
fsm.gracefulRestartTimer.Stop()
@@ -1583,14 +1745,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case err := <-h.stateReasonCh:
h.conn.Close()
h.t.Kill(nil)
+ fsm.lock.RLock()
if s := fsm.pConf.GracefulRestart.State; s.Enabled &&
(s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV ||
err.Type == FSM_READ_FAILED ||
@@ -1603,20 +1768,25 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
}).Info("peer graceful restart")
fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second)
}
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("hold timer expired")
+ fsm.lock.RUnlock()
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil)
h.outgoing.In() <- &FsmOutgoingMsg{Notification: m}
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil)
case <-h.holdTimerResetCh:
+ fsm.lock.RLock()
if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 {
holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
}
+ fsm.lock.RUnlock()
case stateOp := <-fsm.adminStateCh:
err := h.changeAdminState(stateOp.State)
if err == nil {
@@ -1633,12 +1803,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
func (h *FSMHandler) loop() error {
fsm := h.fsm
ch := make(chan bgp.FSMState)
+ fsm.lock.RLock()
oldState := fsm.state
+ fsm.lock.RUnlock()
var reason *FsmStateReason
f := func() error {
nextState := bgp.FSMState(-1)
- switch fsm.state {
+ fsm.lock.RLock()
+ fsmState := fsm.state
+ fsm.lock.RUnlock()
+ switch fsmState {
case bgp.BGP_FSM_IDLE:
nextState, reason = h.idle()
// case bgp.BGP_FSM_CONNECT:
@@ -1661,6 +1836,7 @@ func (h *FSMHandler) loop() error {
nextState := <-ch
+ fsm.lock.RLock()
if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1685,6 +1861,7 @@ func (h *FSMHandler) loop() error {
"Reason": reason.String(),
}).Info("Peer Down")
}
+ fsm.lock.RUnlock()
e := time.AfterFunc(time.Second*120, func() {
log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState)
@@ -1694,6 +1871,7 @@ func (h *FSMHandler) loop() error {
// under zero means that tomb.Dying()
if nextState >= bgp.BGP_FSM_IDLE {
+ fsm.lock.RLock()
h.stateCh <- &FsmMsg{
MsgType: FSM_MSG_STATE_CHANGE,
MsgSrc: fsm.pConf.State.NeighborAddress,
@@ -1701,11 +1879,15 @@ func (h *FSMHandler) loop() error {
StateReason: reason,
Version: h.fsm.version,
}
+ fsm.lock.RUnlock()
}
return nil
}
func (h *FSMHandler) changeAdminState(s AdminState) error {
+ h.fsm.lock.Lock()
+ defer h.fsm.lock.Unlock()
+
fsm := h.fsm
if fsm.adminState != s {
log.WithFields(log.Fields{
diff --git a/pkg/server/peer.go b/pkg/server/peer.go
index 1f3dd8d8..2f7d6cc2 100644
--- a/pkg/server/peer.go
+++ b/pkg/server/peer.go
@@ -88,7 +88,9 @@ func newDynamicPeer(g *config.Global, neighborAddress string, pg *config.PeerGro
return nil
}
peer := NewPeer(g, &conf, loc, policy)
+ peer.fsm.lock.Lock()
peer.fsm.state = bgp.BGP_FSM_ACTIVE
+ peer.fsm.lock.Unlock()
return peer
}
@@ -122,10 +124,14 @@ func NewPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, p
}
func (peer *Peer) AS() uint32 {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.State.PeerAs
}
func (peer *Peer) ID() string {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.State.NeighborAddress
}
@@ -138,18 +144,26 @@ func (peer *Peer) isIBGPPeer() bool {
}
func (peer *Peer) isRouteServerClient() bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.RouteServer.Config.RouteServerClient
}
func (peer *Peer) isRouteReflectorClient() bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.RouteReflector.Config.RouteReflectorClient
}
func (peer *Peer) isGracefulRestartEnabled() bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.GracefulRestart.State.Enabled
}
func (peer *Peer) getAddPathMode(family bgp.RouteFamily) bgp.BGPAddPathMode {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
if mode, y := peer.fsm.rfMap[family]; y {
return mode
}
@@ -165,10 +179,14 @@ func (peer *Peer) isAddPathSendEnabled(family bgp.RouteFamily) bool {
}
func (peer *Peer) isDynamicNeighbor() bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
return peer.fsm.pConf.Config.NeighborAddress == "" && peer.fsm.pConf.Config.NeighborInterface == ""
}
func (peer *Peer) recvedAllEOR() bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
for _, a := range peer.fsm.pConf.AfiSafis {
if s := a.MpGracefulRestart.State; s.Enabled && !s.EndOfRibReceived {
return false
@@ -178,11 +196,15 @@ func (peer *Peer) recvedAllEOR() bool {
}
func (peer *Peer) configuredRFlist() []bgp.RouteFamily {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
rfs, _ := config.AfiSafis(peer.fsm.pConf.AfiSafis).ToRfList()
return rfs
}
func (peer *Peer) negotiatedRFList() []bgp.RouteFamily {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
l := make([]bgp.RouteFamily, 0, len(peer.fsm.rfMap))
for family, _ := range peer.fsm.rfMap {
l = append(l, family)
@@ -191,6 +213,8 @@ func (peer *Peer) negotiatedRFList() []bgp.RouteFamily {
}
func (peer *Peer) toGlobalFamilies(families []bgp.RouteFamily) []bgp.RouteFamily {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
if peer.fsm.pConf.Config.Vrf != "" {
fs := make([]bgp.RouteFamily, 0, len(families))
for _, f := range families {
@@ -233,26 +257,32 @@ func classifyFamilies(all, part []bgp.RouteFamily) ([]bgp.RouteFamily, []bgp.Rou
}
func (peer *Peer) forwardingPreservedFamilies() ([]bgp.RouteFamily, []bgp.RouteFamily) {
+ peer.fsm.lock.RLock()
list := []bgp.RouteFamily{}
for _, a := range peer.fsm.pConf.AfiSafis {
if s := a.MpGracefulRestart.State; s.Enabled && s.Received {
list = append(list, a.State.Family)
}
}
+ peer.fsm.lock.RUnlock()
return classifyFamilies(peer.configuredRFlist(), list)
}
func (peer *Peer) llgrFamilies() ([]bgp.RouteFamily, []bgp.RouteFamily) {
+ peer.fsm.lock.RLock()
list := []bgp.RouteFamily{}
for _, a := range peer.fsm.pConf.AfiSafis {
if a.LongLivedGracefulRestart.State.Enabled {
list = append(list, a.State.Family)
}
}
+ peer.fsm.lock.RUnlock()
return classifyFamilies(peer.configuredRFlist(), list)
}
func (peer *Peer) isLLGREnabledFamily(family bgp.RouteFamily) bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
if !peer.fsm.pConf.GracefulRestart.Config.LongLivedEnabled {
return false
}
@@ -266,6 +296,8 @@ func (peer *Peer) isLLGREnabledFamily(family bgp.RouteFamily) bool {
}
func (peer *Peer) llgrRestartTime(family bgp.RouteFamily) uint32 {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
for _, a := range peer.fsm.pConf.AfiSafis {
if a.State.Family == family {
return a.LongLivedGracefulRestart.State.PeerRestartTime
@@ -275,6 +307,8 @@ func (peer *Peer) llgrRestartTime(family bgp.RouteFamily) uint32 {
}
func (peer *Peer) llgrRestartTimerExpired(family bgp.RouteFamily) bool {
+ peer.fsm.lock.RLock()
+ defer peer.fsm.lock.RUnlock()
all := true
for _, a := range peer.fsm.pConf.AfiSafis {
if a.State.Family == family {
@@ -309,6 +343,8 @@ func (peer *Peer) markLLGRStale(fs []bgp.RouteFamily) []*table.Path {
}
func (peer *Peer) stopPeerRestarting() {
+ peer.fsm.lock.Lock()
+ defer peer.fsm.lock.Unlock()
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
for _, ch := range peer.llgrEndChs {
close(ch)
@@ -378,7 +414,9 @@ func (peer *Peer) doPrefixLimit(k bgp.RouteFamily, c *config.PrefixLimitConfig)
}
func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error {
+ peer.fsm.lock.RLock()
x := peer.fsm.pConf.AfiSafis
+ peer.fsm.lock.RUnlock()
y := c
if len(x) != len(y) {
return fmt.Errorf("changing supported afi-safi is not allowed")
@@ -406,7 +444,9 @@ func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error {
}
}
}
+ peer.fsm.lock.Lock()
peer.fsm.pConf.AfiSafis = c
+ peer.fsm.lock.Unlock()
return nil
}
@@ -420,7 +460,9 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg
"withdrawals": update.WithdrawnRoutes,
"attributes": update.PathAttributes,
}).Debug("received update")
+ peer.fsm.lock.Lock()
peer.fsm.pConf.Timers.State.UpdateRecvTime = time.Now().Unix()
+ peer.fsm.lock.Unlock()
if len(e.PathList) > 0 {
paths := make([]*table.Path, 0, len(e.PathList))
eor := []bgp.RouteFamily{}
@@ -440,7 +482,11 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg
// If the AS_PATH attribute of a BGP route contains an AS loop, the BGP
// route should be excluded from the Phase 2 decision function.
if aspath := path.GetAsPath(); aspath != nil {
- if hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath) {
+ peer.fsm.lock.RLock()
+ localAS := peer.fsm.peerInfo.LocalAS
+ allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs)
+ peer.fsm.lock.RUnlock()
+ if hasOwnASLoop(localAS, allowOwnAS, aspath) {
path.SetAsLooped(true)
continue
}
@@ -448,7 +494,10 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg
paths = append(paths, path)
}
peer.adjRibIn.Update(e.PathList)
- for _, af := range peer.fsm.pConf.AfiSafis {
+ peer.fsm.lock.RLock()
+ peerAfiSafis := peer.fsm.pConf.AfiSafis
+ peer.fsm.lock.RUnlock()
+ for _, af := range peerAfiSafis {
if msg := peer.doPrefixLimit(af.State.Family, &af.PrefixLimit.Config); msg != nil {
return nil, nil, msg
}
@@ -459,7 +508,10 @@ func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg
}
func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) {
- peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
+ handler := NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
+ peer.fsm.lock.Lock()
+ peer.fsm.h = handler
+ peer.fsm.lock.Unlock()
}
func (peer *Peer) StaleAll(rfList []bgp.RouteFamily) []*table.Path {
@@ -484,13 +536,16 @@ func (peer *Peer) DropAll(rfList []bgp.RouteFamily) {
func (peer *Peer) stopFSM() error {
failed := false
+ peer.fsm.lock.RLock()
addr := peer.fsm.pConf.State.NeighborAddress
+ peer.fsm.lock.RUnlock()
t1 := time.AfterFunc(time.Minute*5, func() {
log.WithFields(log.Fields{
"Topic": "Peer",
}).Warnf("Failed to free the fsm.h.t for %s", addr)
failed = true
})
+
peer.fsm.h.t.Kill(nil)
peer.fsm.h.t.Wait()
t1.Stop()
diff --git a/pkg/server/server.go b/pkg/server/server.go
index b52ad284..78f0c254 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -195,7 +195,10 @@ func (server *BgpServer) Serve() {
}).Warnf("Can't find the neighbor %s", e.MsgSrc)
return
}
- if e.Version != peer.fsm.version {
+ peer.fsm.lock.RLock()
+ versionMismatch := e.Version != peer.fsm.version
+ peer.fsm.lock.RUnlock()
+ if versionMismatch {
log.WithFields(log.Fields{
"Topic": "Peer",
}).Debug("FSM version inconsistent")
@@ -211,15 +214,23 @@ func (server *BgpServer) Serve() {
remoteAddr := ipaddr.String()
peer, found := server.neighborMap[remoteAddr]
if found {
- if peer.fsm.adminState != ADMIN_STATE_UP {
+ peer.fsm.lock.RLock()
+ adminStateNotUp := peer.fsm.adminState != ADMIN_STATE_UP
+ peer.fsm.lock.RUnlock()
+ if adminStateNotUp {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Remote Addr": remoteAddr,
"Admin State": peer.fsm.adminState,
}).Debug("New connection for non admin-state-up peer")
+ peer.fsm.lock.RUnlock()
conn.Close()
return
}
+ peer.fsm.lock.RLock()
+ localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
+ peer.fsm.lock.RUnlock()
localAddrValid := func(laddr string) bool {
if laddr == "0.0.0.0" || laddr == "::" {
return true
@@ -241,7 +252,7 @@ func (server *BgpServer) Serve() {
return false
}
return true
- }(peer.fsm.pConf.Transport.Config.LocalAddress)
+ }(localAddr)
if !localAddrValid {
conn.Close()
@@ -269,7 +280,10 @@ func (server *BgpServer) Serve() {
conn.Close()
return
}
- server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): peer.fsm.pConf.ApplyPolicy})
+ peer.fsm.lock.RLock()
+ policy := peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
+ server.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
server.neighborMap[remoteAddr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
@@ -357,12 +371,19 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
if path == nil {
return nil
}
- if _, ok := peer.fsm.rfMap[path.GetRouteFamily()]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok := peer.fsm.rfMap[path.GetRouteFamily()]
+ peer.fsm.lock.RUnlock()
+ if !ok {
return nil
}
//RFC4684 Constrained Route Distribution
- if _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]; y && path.GetRouteFamily() != bgp.RF_RTC_UC {
+ peer.fsm.lock.RLock()
+ _, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
+ peer.fsm.lock.RUnlock()
+ if y && path.GetRouteFamily() != bgp.RF_RTC_UC {
ignore := true
for _, ext := range path.GetExtCommunities() {
for _, p := range peer.adjRibIn.PathList([]bgp.RouteFamily{bgp.RF_RTC_UC}, true) {
@@ -400,7 +421,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
// RFC4456 8. Avoiding Routing Information Loops
// A router that recognizes the ORIGINATOR_ID attribute SHOULD
// ignore a route received with its BGP Identifier as the ORIGINATOR_ID.
- if id := path.GetOriginatorID(); peer.fsm.gConf.Config.RouterId == id.String() {
+ peer.fsm.lock.RLock()
+ routerId := peer.fsm.gConf.Config.RouterId
+ peer.fsm.lock.RUnlock()
+ if id := path.GetOriginatorID(); routerId == id.String() {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -417,7 +441,10 @@ func filterpath(peer *Peer, path, old *table.Path) *table.Path {
// If the local CLUSTER_ID is found in the CLUSTER_LIST,
// the advertisement received SHOULD be ignored.
for _, clusterID := range path.GetClusterList() {
- if clusterID.Equal(peer.fsm.peerInfo.RouteReflectorClusterID) {
+ peer.fsm.lock.RLock()
+ rrClusterID := peer.fsm.peerInfo.RouteReflectorClusterID
+ peer.fsm.lock.RUnlock()
+ if clusterID.Equal(rrClusterID) {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -472,11 +499,13 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
// assumes "path" was already sent before. This assumption avoids the
// infinite UPDATE loop between Route Reflector and its clients.
if path.IsLocal() && path.Equal(old) {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
"Path": path,
}).Debug("given rtm nlri is already sent, skipping to advertise")
+ peer.fsm.lock.RUnlock()
return nil
}
@@ -512,11 +541,14 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
// only allow vpnv4 and vpnv6 paths to be advertised to VRFed neighbors.
// also check we can import this path using table.CanImportToVrf()
// if we can, make it local path by calling (*Path).ToLocal()
- if path != nil && peer.fsm.pConf.Config.Vrf != "" {
+ peer.fsm.lock.RLock()
+ peerVrf := peer.fsm.pConf.Config.Vrf
+ peer.fsm.lock.RUnlock()
+ if path != nil && peerVrf != "" {
if f := path.GetRouteFamily(); f != bgp.RF_IPv4_VPN && f != bgp.RF_IPv6_VPN {
return nil
}
- vrf := peer.localRib.Vrfs[peer.fsm.pConf.Config.Vrf]
+ vrf := peer.localRib.Vrfs[peerVrf]
if table.CanImportToVrf(vrf, path) {
path = path.ToLocal()
} else {
@@ -525,14 +557,17 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
}
// replace-peer-as handling
+ peer.fsm.lock.RLock()
if path != nil && !path.IsWithdraw && peer.fsm.pConf.AsPathOptions.State.ReplacePeerAs {
path = path.ReplaceAS(peer.fsm.pConf.Config.LocalAs, peer.fsm.pConf.Config.PeerAs)
}
+ peer.fsm.lock.RUnlock()
if path = filterpath(peer, path, old); path == nil {
return nil
}
+ peer.fsm.lock.RLock()
options := &table.PolicyOptions{
Info: peer.fsm.peerInfo,
OldNextHop: path.GetNexthop(),
@@ -542,6 +577,7 @@ func (s *BgpServer) filterpath(peer *Peer, path, old *table.Path) *table.Path {
if v := s.roaManager.validate(path); v != nil {
options.ValidationResult = v
}
+ peer.fsm.lock.RUnlock()
path = peer.policy.ApplyPolicy(peer.TableID(), table.POLICY_DIRECTION_EXPORT, path, options)
// When 'path' is filtered (path == nil), check 'old' has been sent to this peer.
@@ -616,10 +652,14 @@ func (server *BgpServer) notifyBestWatcher(best []*table.Path, multipath [][]*ta
func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
// create copy which can be access to without mutex
+ peer.fsm.lock.RLock()
conf := *peer.fsm.pConf
+ peerAfiSafis := peer.fsm.pConf.AfiSafis
+ peerCapMap := peer.fsm.capMap
+ peer.fsm.lock.RUnlock()
- conf.AfiSafis = make([]config.AfiSafi, len(peer.fsm.pConf.AfiSafis))
- for i, af := range peer.fsm.pConf.AfiSafis {
+ conf.AfiSafis = make([]config.AfiSafi, len(peerAfiSafis))
+ for i, af := range peerAfiSafis {
conf.AfiSafis[i] = af
conf.AfiSafis[i].AddPaths.State.Receive = peer.isAddPathReceiveEnabled(af.State.Family)
if peer.isAddPathSendEnabled(af.State.Family) {
@@ -629,8 +669,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
}
}
- remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peer.fsm.capMap))
- for _, caps := range peer.fsm.capMap {
+ remoteCap := make([]bgp.ParameterCapabilityInterface, 0, len(peerCapMap))
+ for _, caps := range peerCapMap {
for _, m := range caps {
// need to copy all values here
buf, _ := m.Serialize()
@@ -638,6 +678,8 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
remoteCap = append(remoteCap, c)
}
}
+
+ peer.fsm.lock.RLock()
conf.State.RemoteCapabilityList = remoteCap
conf.State.LocalCapabilityList = capabilitiesFromConfig(peer.fsm.pConf)
@@ -663,6 +705,7 @@ func (s *BgpServer) ToConfig(peer *Peer, getAdvertised bool) *config.Neighbor {
conf.State.ReceivedOpenMessage, _ = bgp.ParseBGPMessage(buf)
conf.State.RemoteRouterId = peer.fsm.peerInfo.ID.To4().String()
}
+ peer.fsm.lock.RUnlock()
return &conf
}
@@ -670,10 +713,12 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta
if !server.isWatched(WATCH_EVENT_TYPE_PRE_UPDATE) || peer == nil {
return
}
+
cloned := clonePathList(pathList)
if len(cloned) == 0 {
return
}
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventUpdate{
@@ -690,6 +735,7 @@ func (server *BgpServer) notifyPrePolicyUpdateWatcher(peer *Peer, pathList []*ta
PathList: cloned,
Neighbor: server.ToConfig(peer, false),
}
+ peer.fsm.lock.RUnlock()
server.notifyWatcher(WATCH_EVENT_TYPE_PRE_UPDATE, ev)
}
@@ -697,10 +743,12 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t
if !server.isWatched(WATCH_EVENT_TYPE_POST_UPDATE) || peer == nil {
return
}
+
cloned := clonePathList(pathList)
if len(cloned) == 0 {
return
}
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventUpdate{
@@ -715,6 +763,7 @@ func (server *BgpServer) notifyPostPolicyUpdateWatcher(peer *Peer, pathList []*t
PathList: cloned,
Neighbor: server.ToConfig(peer, false),
}
+ peer.fsm.lock.RUnlock()
server.notifyWatcher(WATCH_EVENT_TYPE_POST_UPDATE, ev)
}
@@ -722,6 +771,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
_, rport := peer.fsm.RemoteHostPort()
laddr, lport := peer.fsm.LocalHostPort()
sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ peer.fsm.lock.RLock()
recvOpen := peer.fsm.recvOpen
e := &WatchEventPeerState{
PeerAS: peer.fsm.peerInfo.AS,
@@ -738,6 +788,7 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
Timestamp: time.Now(),
PeerInterface: peer.fsm.pConf.Config.NeighborInterface,
}
+ peer.fsm.lock.RUnlock()
if m != nil {
e.StateReason = m.StateReason
@@ -746,7 +797,9 @@ func newWatchEventPeerState(peer *Peer, m *FsmMsg) *WatchEventPeerState {
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e *FsmMsg) {
+ peer.fsm.lock.RLock()
newState := peer.fsm.state
+ peer.fsm.lock.RUnlock()
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
server.notifyWatcher(WATCH_EVENT_TYPE_PEER_STATE, newWatchEventPeerState(peer, e))
}
@@ -754,6 +807,7 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState, e
func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, msg *bgp.BGPMessage, isSent bool) {
// validation should be done in the caller of this function
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &WatchEventMessage{
@@ -767,6 +821,7 @@ func (server *BgpServer) notifyMessageWatcher(peer *Peer, timestamp time.Time, m
Timestamp: timestamp,
IsSent: isSent,
}
+ peer.fsm.lock.RUnlock()
if !isSent {
server.notifyWatcher(WATCH_EVENT_TYPE_RECV_MSG, ev)
}
@@ -806,14 +861,20 @@ func (s *BgpServer) getBestFromLocal(peer *Peer, rfList []bgp.RouteFamily) ([]*t
}
func (s *BgpServer) processOutgoingPaths(peer *Peer, paths, olds []*table.Path) []*table.Path {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
return nil
}
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ if localRestarting {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debug("now syncing, suppress sending updates")
+ peer.fsm.lock.RUnlock()
return nil
}
@@ -835,7 +896,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
m := e.MsgData.(*bgp.BGPMessage)
rr := m.Body.(*bgp.BGPRouteRefresh)
rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI)
- if _, ok := peer.fsm.rfMap[rf]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok := peer.fsm.rfMap[rf]
+ peer.fsm.lock.RUnlock()
+ if !ok {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -843,7 +908,11 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
}).Warn("Route family isn't supported")
return nil
}
- if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok {
+
+ peer.fsm.lock.RLock()
+ _, ok = peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]
+ peer.fsm.lock.RUnlock()
+ if !ok {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -861,7 +930,12 @@ func (s *BgpServer) handleRouteRefresh(peer *Peer, e *FsmMsg) []*table.Path {
func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
rs := peer != nil && peer.isRouteServerClient()
- vrf := !rs && peer != nil && peer.fsm.pConf.Config.Vrf != ""
+ vrf := false
+ if peer != nil {
+ peer.fsm.lock.RLock()
+ vrf = !rs && peer.fsm.pConf.Config.Vrf != ""
+ peer.fsm.lock.RUnlock()
+ }
tableId := table.GLOBAL_RIB_NAME
rib := server.globalRib
@@ -872,13 +946,18 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
for _, path := range pathList {
if vrf {
- path = path.ToGlobal(rib.Vrfs[peer.fsm.pConf.Config.Vrf])
+ peer.fsm.lock.RLock()
+ peerVrf := peer.fsm.pConf.Config.Vrf
+ peer.fsm.lock.RUnlock()
+ path = path.ToGlobal(rib.Vrfs[peerVrf])
}
policyOptions := &table.PolicyOptions{}
if !rs && peer != nil {
+ peer.fsm.lock.RLock()
policyOptions.Info = peer.fsm.peerInfo
+ peer.fsm.lock.RUnlock()
}
if v := server.roaManager.validate(path); v != nil {
policyOptions.ValidationResult = v
@@ -949,12 +1028,16 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) {
}
func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) {
+ peer.fsm.lock.RLock()
+ peerInfo := peer.fsm.peerInfo
+ peer.fsm.lock.RUnlock()
+
rib := server.globalRib
if peer.isRouteServerClient() {
rib = server.rsRib
}
for _, family := range peer.toGlobalFamilies(families) {
- for _, path := range rib.GetPathListByPeer(peer.fsm.peerInfo, family) {
+ for _, path := range rib.GetPathListByPeer(peerInfo, family) {
p := path.Clone(true)
if dsts := rib.Update(p); len(dsts) > 0 {
server.propagateUpdateToNeighbors(peer, p, dsts, false)
@@ -995,7 +1078,10 @@ func (server *BgpServer) propagateUpdateToNeighbors(source *Peer, newPath *table
continue
}
f := func() bgp.RouteFamily {
- if targetPeer.fsm.pConf.Config.Vrf != "" {
+ targetPeer.fsm.lock.RLock()
+ peerVrf := targetPeer.fsm.pConf.Config.Vrf
+ targetPeer.fsm.lock.RUnlock()
+ if peerVrf != "" {
switch family {
case bgp.RF_IPv4_VPN:
return bgp.RF_IPv4_UC
@@ -1047,19 +1133,31 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
+ peer.fsm.lock.Lock()
oldState := bgp.FSMState(peer.fsm.pConf.State.SessionState.ToInt())
peer.fsm.pConf.State.SessionState = config.IntToSessionStateMap[int(nextState)]
+ peer.fsm.lock.Unlock()
+
peer.fsm.StateChange(nextState)
+ peer.fsm.lock.RLock()
+ nextStateIdle := peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE
+ peer.fsm.lock.RUnlock()
+
// PeerDown
if oldState == bgp.BGP_FSM_ESTABLISHED {
t := time.Now()
+ peer.fsm.lock.Lock()
if t.Sub(time.Unix(peer.fsm.pConf.Timers.State.Uptime, 0)) < FLOP_THRESHOLD {
peer.fsm.pConf.State.Flops++
}
+ graceful := peer.fsm.reason.Type == FSM_GRACEFUL_RESTART
+ peer.fsm.lock.Unlock()
var drop []bgp.RouteFamily
- if peer.fsm.reason.Type == FSM_GRACEFUL_RESTART {
+ if graceful {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = true
+ peer.fsm.lock.Unlock()
var p []bgp.RouteFamily
p, drop = peer.forwardingPreservedFamilies()
server.propagateUpdate(peer, peer.StaleAll(p))
@@ -1069,19 +1167,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool)
peer.DropAll(drop)
server.dropPeerAllRoutes(peer, drop)
+
+ peer.fsm.lock.Lock()
if peer.fsm.pConf.Config.PeerAs == 0 {
peer.fsm.pConf.State.PeerAs = 0
peer.fsm.peerInfo.AS = 0
}
+ peer.fsm.lock.Unlock()
+
if peer.isDynamicNeighbor() {
peer.stopPeerRestarting()
go peer.stopFSM()
+ peer.fsm.lock.RLock()
delete(server.neighborMap, peer.fsm.pConf.State.NeighborAddress)
+ peer.fsm.lock.RUnlock()
server.broadcastPeerState(peer, oldState, e)
return
}
- } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE {
- if peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled {
+ } else if nextStateIdle {
+ peer.fsm.lock.RLock()
+ longLivedEnabled := peer.fsm.pConf.GracefulRestart.State.LongLivedEnabled
+ peer.fsm.lock.RUnlock()
+ if longLivedEnabled {
llgr, no_llgr := peer.llgrFamilies()
peer.DropAll(no_llgr)
@@ -1141,7 +1248,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// If the session does not get re-established within the "Restart Time"
// that the peer advertised previously, the Receiving Speaker MUST
// delete all the stale routes from the peer that it is retaining.
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
+ peer.fsm.lock.Unlock()
peer.DropAll(peer.configuredRFlist())
server.dropPeerAllRoutes(peer, peer.configuredRFlist())
}
@@ -1153,19 +1262,25 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
// may include zone info
+ peer.fsm.lock.Lock()
peer.fsm.pConf.Transport.State.LocalAddress = laddr
// exclude zone info
ipaddr, _ := net.ResolveIPAddr("ip", laddr)
peer.fsm.peerInfo.LocalAddress = ipaddr.IP
+ neighborAddress := peer.fsm.pConf.State.NeighborAddress
+ peer.fsm.lock.Unlock()
deferralExpiredFunc := func(family bgp.RouteFamily) func() {
return func() {
server.mgmtOperation(func() error {
- server.softResetOut(peer.fsm.pConf.State.NeighborAddress, family, true)
+ server.softResetOut(neighborAddress, family, true)
return nil
}, false)
}
}
- if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ peer.fsm.lock.RLock()
+ notLocalRestarting := !peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if notLocalRestarting {
// When graceful-restart cap (which means intention
// of sending EOR) and route-target address family are negotiated,
// send route-target NLRIs first, and wait to send others
@@ -1176,8 +1291,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// waiting sending non-route-target NLRIs since the peer won't send
// any routes (and EORs) before we send ours (or deferral-timer expires).
var pathList []*table.Path
+ peer.fsm.lock.RLock()
_, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
- if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !peer.fsm.pConf.GracefulRestart.State.PeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 {
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ notPeerRestarting := !peer.fsm.pConf.GracefulRestart.State.PeerRestarting
+ peer.fsm.lock.RUnlock()
+ if y && notPeerRestarting && c.RouteTargetMembership.Config.DeferralTime > 0 {
pathList, _ = server.getBestFromLocal(peer, []bgp.RouteFamily{bgp.RF_RTC_UC})
t := c.RouteTargetMembership.Config.DeferralTime
for _, f := range peer.negotiatedRFList() {
@@ -1211,7 +1330,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
}()
if allEnd {
for _, p := range server.neighborMap {
+ p.fsm.lock.Lock()
p.fsm.pConf.GracefulRestart.State.LocalRestarting = false
+ p.fsm.lock.Unlock()
if !p.isGracefulRestartEnabled() {
continue
}
@@ -1224,7 +1345,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
"Topic": "Server",
}).Info("sync finished")
} else {
+ peer.fsm.lock.RLock()
deferral := peer.fsm.pConf.GracefulRestart.Config.DeferralTime
+ peer.fsm.lock.RUnlock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -1236,7 +1359,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
if server.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE {
die := true
for _, p := range server.neighborMap {
- if p.fsm.state != bgp.BGP_FSM_IDLE {
+ p.fsm.lock.RLock()
+ stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE
+ p.fsm.lock.RUnlock()
+ if stateNotIdle {
die = false
break
}
@@ -1245,19 +1371,30 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
server.shutdownWG.Done()
}
}
+ peer.fsm.lock.Lock()
peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix()
+ peer.fsm.lock.Unlock()
}
// clear counter
- if peer.fsm.adminState == ADMIN_STATE_DOWN {
+ peer.fsm.lock.RLock()
+ adminStateDown := peer.fsm.adminState == ADMIN_STATE_DOWN
+ peer.fsm.lock.RUnlock()
+ if adminStateDown {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.State = config.NeighborState{}
peer.fsm.pConf.State.NeighborAddress = peer.fsm.pConf.Config.NeighborAddress
peer.fsm.pConf.State.PeerAs = peer.fsm.pConf.Config.PeerAs
peer.fsm.pConf.Timers.State = config.TimersState{}
+ peer.fsm.lock.Unlock()
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, oldState, e)
case FSM_MSG_ROUTE_REFRESH:
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime
+ peer.fsm.lock.RUnlock()
+ if notEstablished || beforeUptime {
return
}
if paths := server.handleRouteRefresh(peer, e); len(paths) > 0 {
@@ -1271,7 +1408,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
return
case *bgp.BGPMessage:
server.notifyRecvMessageWatcher(peer, e.timestamp, m)
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED || e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ beforeUptime := e.timestamp.Unix() < peer.fsm.pConf.Timers.State.Uptime
+ peer.fsm.lock.RUnlock()
+ if notEstablished || beforeUptime {
return
}
pathList, eor, notification := peer.handleUpdate(e)
@@ -1287,15 +1428,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
server.propagateUpdate(peer, pathList)
}
+ peer.fsm.lock.RLock()
+ peerAfiSafis := peer.fsm.pConf.AfiSafis
+ peer.fsm.lock.RUnlock()
if len(eor) > 0 {
rtc := false
for _, f := range eor {
if f == bgp.RF_RTC_UC {
rtc = true
}
- for i, a := range peer.fsm.pConf.AfiSafis {
+ for i, a := range peerAfiSafis {
if a.State.Family == f {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true
+ peer.fsm.lock.Unlock()
}
}
}
@@ -1307,7 +1453,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// End-of-RIB marker from all its peers (excluding the ones with the
// "Restart State" bit set in the received capability and excluding the
// ones that do not advertise the graceful restart capability) or ...snip...
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+
+ peer.fsm.lock.RLock()
+ localRestarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if localRestarting {
allEnd := func() bool {
for _, p := range server.neighborMap {
if !p.recvedAllEOR() {
@@ -1318,7 +1468,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
}()
if allEnd {
for _, p := range server.neighborMap {
+ p.fsm.lock.Lock()
p.fsm.pConf.GracefulRestart.State.LocalRestarting = false
+ p.fsm.lock.Unlock()
if !p.isGracefulRestartEnabled() {
continue
}
@@ -1336,14 +1488,19 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// we don't delay non-route-target NLRIs when local-restarting
rtc = false
}
- if peer.fsm.pConf.GracefulRestart.State.PeerRestarting {
+ peer.fsm.lock.RLock()
+ peerRestarting := peer.fsm.pConf.GracefulRestart.State.PeerRestarting
+ peer.fsm.lock.RUnlock()
+ if peerRestarting {
if peer.recvedAllEOR() {
peer.stopPeerRestarting()
pathList := peer.adjRibIn.DropStale(peer.configuredRFlist())
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debugf("withdraw %d stale routes", len(pathList))
+ peer.fsm.lock.RUnlock()
server.propagateUpdate(peer, pathList)
}
@@ -1353,7 +1510,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
// received EOR of route-target address family
// outbound filter is now ready, let's flash non-route-target NLRIs
- if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 {
+ peer.fsm.lock.RLock()
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ peer.fsm.lock.RUnlock()
+ if rtc && c != nil && c.RouteTargetMembership.Config.DeferralTime > 0 {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -1442,11 +1602,13 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error {
ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1)
ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy
for _, peer := range s.neighborMap {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Info("call set policy")
ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
}
return s.policy.Reset(&policy, ap)
}, false)
@@ -1731,7 +1893,10 @@ func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInte
func (s *BgpServer) DeleteVrf(name string) error {
return s.mgmtOperation(func() error {
for _, n := range s.neighborMap {
- if n.fsm.pConf.Config.Vrf == name {
+ n.fsm.lock.RLock()
+ peerVrf := n.fsm.pConf.Config.Vrf
+ n.fsm.lock.RUnlock()
+ if peerVrf == name {
return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID())
}
}
@@ -1792,7 +1957,11 @@ func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error {
// route should be excluded from the Phase 2 decision function.
isLooped := false
if aspath := path.GetAsPath(); aspath != nil {
- isLooped = hasOwnASLoop(peer.fsm.peerInfo.LocalAS, int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs), aspath)
+ peer.fsm.lock.RLock()
+ localAS := peer.fsm.peerInfo.LocalAS
+ allowOwnAS := int(peer.fsm.pConf.AsPathOptions.Config.AllowOwnAs)
+ peer.fsm.lock.RUnlock()
+ isLooped = hasOwnASLoop(localAS, allowOwnAS, aspath)
}
if path.IsAsLooped() != isLooped {
// can't modify the existing one. needs to create one
@@ -1816,21 +1985,30 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b
return err
}
for _, peer := range peers {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
families := familiesForSoftreset(peer, family)
if deferral {
+ peer.fsm.lock.RLock()
_, y := peer.fsm.rfMap[bgp.RF_RTC_UC]
- if peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
+ c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC)
+ restarting := peer.fsm.pConf.GracefulRestart.State.LocalRestarting
+ peer.fsm.lock.RUnlock()
+ if restarting {
+ peer.fsm.lock.Lock()
peer.fsm.pConf.GracefulRestart.State.LocalRestarting = false
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
"Families": families,
}).Debug("deferral timer expired")
- } else if c := peer.fsm.pConf.GetAfiSafi(bgp.RF_RTC_UC); y && !c.MpGracefulRestart.State.EndOfRibReceived {
+ peer.fsm.lock.Unlock()
+ } else if y && !c.MpGracefulRestart.State.EndOfRibReceived {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
@@ -2038,7 +2216,10 @@ func (s *BgpServer) GetNeighbor(address string, getAdvertised bool) (l []*config
s.mgmtOperation(func() error {
l = make([]*config.Neighbor, 0, len(s.neighborMap))
for k, peer := range s.neighborMap {
- if address != "" && address != k && address != peer.fsm.pConf.Config.NeighborInterface {
+ peer.fsm.lock.RLock()
+ neighborIface := peer.fsm.pConf.Config.NeighborInterface
+ peer.fsm.lock.RUnlock()
+ if address != "" && address != k && address != neighborIface {
continue
}
l = append(l, s.ToConfig(peer, getAdvertised))
@@ -2222,7 +2403,10 @@ func (s *BgpServer) DeletePeerGroup(c *config.PeerGroup) error {
return s.mgmtOperation(func() error {
name := c.Config.PeerGroupName
for _, n := range s.neighborMap {
- if n.fsm.pConf.Config.PeerGroup == name {
+ n.fsm.lock.RLock()
+ peerGroup := n.fsm.pConf.Config.PeerGroup
+ n.fsm.lock.RUnlock()
+ if peerGroup == name {
return fmt.Errorf("failed to delete peer-group %s: neighbor %s is in use", name, n.ID())
}
}
@@ -2411,7 +2595,9 @@ func (s *BgpServer) ResetNeighbor(addr, communication string) error {
}
peers, _ := s.addrToPeers(addr)
for _, peer := range peers {
+ peer.fsm.lock.Lock()
peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset
+ peer.fsm.lock.Unlock()
}
return nil
}, true)
@@ -2426,12 +2612,16 @@ func (s *BgpServer) setAdminState(addr, communication string, enable bool) error
f := func(stateOp *AdminStateOperation, message string) {
select {
case peer.fsm.adminStateCh <- *stateOp:
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.fsm.pConf.State.NeighborAddress,
}).Debug(message)
+ peer.fsm.lock.RUnlock()
default:
+ peer.fsm.lock.RLock()
log.Warning("previous request is still remaining. : ", peer.fsm.pConf.State.NeighborAddress)
+ peer.fsm.lock.RUnlock()
}
}
if enable {
@@ -2942,7 +3132,10 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
if w.opts.initPeerState {
for _, peer := range s.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
w.notify(newWatchEventPeerState(peer, nil))
@@ -2956,14 +3149,18 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
if w.opts.initUpdate {
for _, peer := range s.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.lock.RLock()
+ notEstablished := peer.fsm.state != bgp.BGP_FSM_ESTABLISHED
+ peer.fsm.lock.RUnlock()
+ if notEstablished {
continue
}
configNeighbor := w.s.ToConfig(peer, false)
for _, rf := range peer.configuredRFlist() {
+ peer.fsm.lock.RLock()
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- w.notify(&WatchEventUpdate{
+ update := &WatchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
@@ -2974,11 +3171,14 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
PostPolicy: false,
Neighbor: configNeighbor,
PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false),
- })
+ }
+ peer.fsm.lock.RUnlock()
+ w.notify(update)
eor := bgp.NewEndOfRib(rf)
eorBuf, _ := eor.Serialize()
- w.notify(&WatchEventUpdate{
+ peer.fsm.lock.RLock()
+ update = &WatchEventUpdate{
Message: eor,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
@@ -2991,7 +3191,9 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
Payload: eorBuf,
PostPolicy: false,
Neighbor: configNeighbor,
- })
+ }
+ peer.fsm.lock.RUnlock()
+ w.notify(update)
}
}
}