summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/fsm.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/fsm.go')
-rw-r--r--pkg/server/fsm.go218
1 files changed, 200 insertions, 18 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go
index d0cc4f86..073cd88f 100644
--- a/pkg/server/fsm.go
+++ b/pkg/server/fsm.go
@@ -21,6 +21,7 @@ import (
"math/rand"
"net"
"strconv"
+ "sync"
"time"
"github.com/eapache/channels"
@@ -193,6 +194,7 @@ type FSM struct {
t tomb.Tomb
gConf *config.Global
pConf *config.Neighbor
+ lock sync.RWMutex
state bgp.FSMState
reason *FsmStateReason
conn net.Conn
@@ -215,6 +217,8 @@ type FSM struct {
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
state := &fsm.pConf.State.Messages
timer := &fsm.pConf.Timers
if isIn {
@@ -264,6 +268,8 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
stats := &fsm.pConf.State.Messages.Received
switch statType {
// TODO
@@ -305,6 +311,9 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
}
func (fsm *FSM) StateChange(nextState bgp.FSMState) {
+ fsm.lock.Lock()
+ defer fsm.lock.Unlock()
+
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -367,7 +376,11 @@ func (fsm *FSM) LocalHostPort() (string, uint16) {
}
func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) {
- if fsm.h != nil && fsm.h.conn != nil {
+ fsm.lock.RLock()
+ established := fsm.h != nil && fsm.h.conn != nil
+ fsm.lock.RUnlock()
+
+ if established {
m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data)
b, _ := m.Serialize()
_, err := fsm.h.conn.Write(b)
@@ -392,7 +405,9 @@ func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) (
}
func (fsm *FSM) connectLoop() error {
+ fsm.lock.RLock()
tick := int(fsm.pConf.Timers.Config.ConnectRetry)
+ fsm.lock.RUnlock()
if tick < MIN_CONNECT_RETRY {
tick = MIN_CONNECT_RETRY
}
@@ -403,6 +418,9 @@ func (fsm *FSM) connectLoop() error {
timer.Stop()
connect := func() {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
addr := fsm.pConf.State.NeighborAddress
port := int(bgp.BGP_PORT)
if fsm.pConf.Transport.Config.RemotePort != 0 {
@@ -460,13 +478,18 @@ func (fsm *FSM) connectLoop() error {
for {
select {
case <-fsm.t.Dying():
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
}).Debug("stop connect loop")
+ fsm.lock.RUnlock()
return nil
case <-timer.C:
- if fsm.state == bgp.BGP_FSM_ACTIVE {
+ fsm.lock.RLock()
+ ready := fsm.state == bgp.BGP_FSM_ACTIVE
+ fsm.lock.RUnlock()
+ if ready {
go connect()
}
case <-fsm.getActiveCh:
@@ -504,18 +527,27 @@ func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *F
func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+ fsm.lock.RLock()
idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime))
+ fsm.lock.RUnlock()
+
for {
select {
case <-h.t.Dying():
return -1, NewFsmStateReason(FSM_DYING, nil, nil)
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
case conn, ok := <-fsm.connCh:
@@ -523,20 +555,28 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
+
case <-idleHoldTimer.C:
+ fsm.lock.RLock()
+ adminStateUp := fsm.adminState == ADMIN_STATE_UP
+ fsm.lock.RUnlock()
- if fsm.adminState == ADMIN_STATE_UP {
+ if adminStateUp {
+ fsm.lock.Lock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"Duration": fsm.idleHoldTime,
}).Debug("IdleHoldTimer expired")
fsm.idleHoldTime = HOLDTIME_IDLE
+ fsm.lock.Unlock()
return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil)
} else {
@@ -553,7 +593,9 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) {
case ADMIN_STATE_UP:
// restart idle hold timer
+ fsm.lock.RLock()
idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime))
+ fsm.lock.RUnlock()
}
}
}
@@ -570,9 +612,13 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) {
if !ok {
break
}
+ fsm.lock.Lock()
fsm.conn = conn
+ fsm.lock.Unlock()
ttl := 0
ttlMin := 0
+
+ fsm.lock.RLock()
if fsm.pConf.TtlSecurity.Config.Enabled {
ttl = 255
ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin)
@@ -605,16 +651,22 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) {
}).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err)
}
}
+ fsm.lock.RUnlock()
// we don't implement delayed open timer so move to opensent right
// away.
return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil)
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
case err := <-h.stateReasonCh:
@@ -791,6 +843,9 @@ func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily {
}
func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string {
+ h.fsm.lock.Lock()
+ defer h.fsm.lock.Unlock()
+
n := bgp.AddressFamilyNameMap[rf]
for i, a := range h.fsm.pConf.AfiSafis {
@@ -817,36 +872,44 @@ func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError b
handling = factor.ErrorHandling
switch handling {
case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD:
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warn("Some attributes were discarded")
+ h.fsm.lock.RUnlock()
case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW:
m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate))
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warn("the received Update message was treated as withdraw")
+ h.fsm.lock.RUnlock()
case bgp.ERROR_HANDLING_AFISAFI_DISABLE:
rf := extractRouteFamily(factor.ErrorAttribute)
if rf == nil {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
}).Warn("Error occurred during AFI/SAFI disabling")
+ h.fsm.lock.RUnlock()
} else {
n := h.afiSafiDisable(*rf)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": e,
}).Warnf("Capability %s was disabled", n)
+ h.fsm.lock.RUnlock()
}
}
} else {
@@ -874,6 +937,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
err = hd.DecodeFromBytes(headerBuf)
if err != nil {
h.fsm.bgpMessageStateUpdate(0, true)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -886,6 +950,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
MsgData: err,
Version: h.fsm.version,
}
+ h.fsm.lock.RUnlock()
return fmsg, err
}
@@ -896,10 +961,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
}
now := time.Now()
- useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw
handling := bgp.ERROR_HANDLING_NONE
- m, err := bgp.ParseBGPBody(hd, bodyBuf, h.fsm.marshallingOptions)
+ h.fsm.lock.RLock()
+ useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+
+ m, err := bgp.ParseBGPBody(hd, bodyBuf, options)
if err != nil {
handling = h.handlingError(m, err, useRevisedError)
h.fsm.bgpMessageStateUpdate(0, true)
@@ -907,29 +976,38 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
h.fsm.bgpMessageStateUpdate(m.Header.Type, true)
err = bgp.ValidateBGPMessage(m)
}
+ h.fsm.lock.RLock()
fmsg := &FsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
MsgSrc: h.fsm.pConf.State.NeighborAddress,
timestamp: now,
Version: h.fsm.version,
}
+ h.fsm.lock.RUnlock()
switch handling {
case bgp.ERROR_HANDLING_AFISAFI_DISABLE:
fmsg.MsgData = m
return fmsg, nil
case bgp.ERROR_HANDLING_SESSION_RESET:
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": err,
}).Warn("Session will be reset due to malformed BGP message")
+ h.fsm.lock.RUnlock()
fmsg.MsgData = err
return fmsg, err
default:
fmsg.MsgData = m
- if h.fsm.state == bgp.BGP_FSM_ESTABLISHED {
+
+ h.fsm.lock.RLock()
+ establishedState := h.fsm.state == bgp.BGP_FSM_ESTABLISHED
+ h.fsm.lock.RUnlock()
+
+ if establishedState {
switch m.Header.Type {
case bgp.BGP_MSG_ROUTE_REFRESH:
fmsg.MsgType = FSM_MSG_ROUTE_REFRESH
@@ -942,17 +1020,22 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
copy(fmsg.payload, headerBuf)
copy(fmsg.payload[len(headerBuf):], bodyBuf)
- ok, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, isEBGP, isConfed)
+ h.fsm.lock.RLock()
+ rfMap := h.fsm.rfMap
+ h.fsm.lock.RUnlock()
+ ok, err := bgp.ValidateUpdateMsg(body, rfMap, isEBGP, isConfed)
if !ok {
handling = h.handlingError(m, err, useRevisedError)
}
if handling == bgp.ERROR_HANDLING_SESSION_RESET {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
"State": h.fsm.state.String(),
"error": err,
}).Warn("Session will be reset due to malformed BGP update message")
+ h.fsm.lock.RUnlock()
fmsg.MsgData = err
return fmsg, err
}
@@ -974,7 +1057,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
return fmsg, err
}
- fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp)
+ h.fsm.lock.RLock()
+ peerInfo := h.fsm.peerInfo
+ h.fsm.lock.RUnlock()
+ fmsg.PathList = table.ProcessMessage(m, peerInfo, fmsg.timestamp)
fallthrough
case bgp.BGP_MSG_KEEPALIVE:
// if the length of h.holdTimerResetCh
@@ -991,6 +1077,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
body := m.Body.(*bgp.BGPNotification)
if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) {
communication, rest := decodeAdministrativeCommunication(body.Data)
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -999,7 +1086,9 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
"Communicated-Reason": communication,
"Data": rest,
}).Warn("received notification")
+ h.fsm.lock.RUnlock()
} else {
+ h.fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": h.fsm.pConf.State.NeighborAddress,
@@ -1007,9 +1096,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
"Subcode": body.ErrorSubcode,
"Data": body.Data,
}).Warn("received notification")
+ h.fsm.lock.RUnlock()
}
- if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET {
+ h.fsm.lock.RLock()
+ s := h.fsm.pConf.GracefulRestart.State
+ hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET
+ h.fsm.lock.RUnlock()
+ if hardReset {
sendToStateReasonCh(FSM_HARD_RESET, m)
} else {
sendToStateReasonCh(FSM_NOTIFICATION_RECV, m)
@@ -1089,13 +1183,20 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode]
func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+
+ fsm.lock.RLock()
m := buildopen(fsm.gConf, fsm.pConf)
+ fsm.lock.RUnlock()
+
b, _ := m.Serialize()
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
h.msgCh = channels.NewInfiniteChannel()
+
+ fsm.lock.RLock()
h.conn = fsm.conn
+ fsm.lock.RUnlock()
h.t.Go(h.recvMessage)
@@ -1103,7 +1204,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
// sets its HoldTimer to a large value
// A HoldTimer value of 4 minutes is suggested as a "large value"
// for the HoldTimer
+ fsm.lock.RLock()
holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime))
+ fsm.lock.RUnlock()
for {
select {
@@ -1115,18 +1218,25 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
h.conn.Close()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
@@ -1139,16 +1249,27 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
if m.Header.Type == bgp.BGP_MSG_OPEN {
+ fsm.lock.Lock()
fsm.recvOpen = m
+ fsm.lock.Unlock()
+
body := m.Body.(*bgp.BGPOpen)
- peerAs, err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs)
+
+ fsm.lock.RLock()
+ fsmPeerAS := fsm.pConf.Config.PeerAs
+ fsm.lock.RUnlock()
+ peerAs, err := bgp.ValidateOpenMsg(body, fsmPeerAS)
if err != nil {
m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError))
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil)
}
// ASN negotiation was skipped
- if fsm.pConf.Config.PeerAs == 0 {
+ fsm.lock.RLock()
+ asnNegotiationSkipped := fsm.pConf.Config.PeerAs == 0
+ fsm.lock.RUnlock()
+ if asnNegotiationSkipped {
+ fsm.lock.Lock()
typ := config.PEER_TYPE_EXTERNAL
if fsm.peerInfo.LocalAS == peerAs {
typ = config.PEER_TYPE_INTERNAL
@@ -1159,9 +1280,13 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ)
+ fsm.lock.Unlock()
} else {
+ fsm.lock.Lock()
fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType
+ fsm.lock.Unlock()
}
+ fsm.lock.Lock()
fsm.pConf.State.PeerAs = peerAs
fsm.peerInfo.AS = peerAs
fsm.peerInfo.ID = body.ID
@@ -1264,6 +1389,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
}
}
+ fsm.lock.Unlock()
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
fsm.conn.Write(b)
@@ -1313,6 +1439,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) {
}
func keepaliveTicker(fsm *FSM) *time.Ticker {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime
if negotiatedTime == 0 {
return &time.Ticker{}
@@ -1328,6 +1457,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
h.msgCh = channels.NewInfiniteChannel()
+ fsm.lock.RLock()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -1340,6 +1470,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
// sets the HoldTimer according to the negotiated value
holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
}
+ fsm.lock.RUnlock()
for {
select {
@@ -1351,18 +1482,25 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case <-fsm.gracefulRestartTimer.C:
- if fsm.pConf.GracefulRestart.State.PeerRestarting {
+ fsm.lock.RLock()
+ restarting := fsm.pConf.GracefulRestart.State.PeerRestarting
+ fsm.lock.RUnlock()
+ if restarting {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("graceful restart timer expired")
+ fsm.lock.RUnlock()
h.conn.Close()
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil)
}
@@ -1429,6 +1567,7 @@ func (h *FSMHandler) sendMessageloop() error {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
send := func(m *bgp.BGPMessage) error {
+ fsm.lock.RLock()
if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1440,29 +1579,37 @@ func (h *FSMHandler) sendMessageloop() error {
table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate))
}
b, err := m.Serialize(h.fsm.marshallingOptions)
+ fsm.lock.RUnlock()
if err != nil {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"Data": err,
}).Warn("failed to serialize")
+ fsm.lock.RUnlock()
fsm.bgpMessageStateUpdate(0, false)
return nil
}
- if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil {
+ fsm.lock.RLock()
+ err = conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)))
+ fsm.lock.RUnlock()
+ if err != nil {
h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil)
conn.Close()
return fmt.Errorf("failed to set write deadline")
}
_, err = conn.Write(b)
if err != nil {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"Data": err,
}).Warn("failed to send")
+ fsm.lock.RUnlock()
h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil)
conn.Close()
return fmt.Errorf("closed")
@@ -1474,6 +1621,7 @@ func (h *FSMHandler) sendMessageloop() error {
body := m.Body.(*bgp.BGPNotification)
if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) {
communication, rest := decodeAdministrativeCommunication(body.Data)
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1483,7 +1631,9 @@ func (h *FSMHandler) sendMessageloop() error {
"Communicated-Reason": communication,
"Data": rest,
}).Warn("sent notification")
+ fsm.lock.RUnlock()
} else {
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1492,12 +1642,14 @@ func (h *FSMHandler) sendMessageloop() error {
"Subcode": body.ErrorSubcode,
"Data": body.Data,
}).Warn("sent notification")
+ fsm.lock.RUnlock()
}
h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil)
conn.Close()
return fmt.Errorf("closed")
case bgp.BGP_MSG_UPDATE:
update := m.Body.(*bgp.BGPUpdate)
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
@@ -1506,13 +1658,16 @@ func (h *FSMHandler) sendMessageloop() error {
"withdrawals": update.WithdrawnRoutes,
"attributes": update.PathAttributes,
}).Debug("sent update")
+ fsm.lock.RUnlock()
default:
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
"data": m,
}).Debug("sent")
+ fsm.lock.RUnlock()
}
return nil
}
@@ -1523,7 +1678,10 @@ func (h *FSMHandler) sendMessageloop() error {
return nil
case o := <-h.outgoing.Out():
m := o.(*FsmOutgoingMsg)
- for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, h.fsm.marshallingOptions) {
+ h.fsm.lock.RLock()
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+ for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
if err := send(msg); err != nil {
return nil
}
@@ -1560,7 +1718,9 @@ func (h *FSMHandler) recvMessageloop() error {
func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
fsm := h.fsm
+ fsm.lock.Lock()
h.conn = fsm.conn
+ fsm.lock.Unlock()
h.t.Go(h.sendMessageloop)
h.msgCh = h.incoming
h.t.Go(h.recvMessageloop)
@@ -1569,7 +1729,9 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
holdTimer = &time.Timer{}
} else {
+ fsm.lock.RLock()
holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
+ fsm.lock.RUnlock()
}
fsm.gracefulRestartTimer.Stop()
@@ -1583,14 +1745,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
break
}
conn.Close()
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("Closed an accepted connection")
+ fsm.lock.RUnlock()
case err := <-h.stateReasonCh:
h.conn.Close()
h.t.Kill(nil)
+ fsm.lock.RLock()
if s := fsm.pConf.GracefulRestart.State; s.Enabled &&
(s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV ||
err.Type == FSM_READ_FAILED ||
@@ -1603,20 +1768,25 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
}).Info("peer graceful restart")
fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second)
}
+ fsm.lock.RUnlock()
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
+ fsm.lock.RLock()
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.pConf.State.NeighborAddress,
"State": fsm.state.String(),
}).Warn("hold timer expired")
+ fsm.lock.RUnlock()
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil)
h.outgoing.In() <- &FsmOutgoingMsg{Notification: m}
return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil)
case <-h.holdTimerResetCh:
+ fsm.lock.RLock()
if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 {
holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))
}
+ fsm.lock.RUnlock()
case stateOp := <-fsm.adminStateCh:
err := h.changeAdminState(stateOp.State)
if err == nil {
@@ -1633,12 +1803,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) {
func (h *FSMHandler) loop() error {
fsm := h.fsm
ch := make(chan bgp.FSMState)
+ fsm.lock.RLock()
oldState := fsm.state
+ fsm.lock.RUnlock()
var reason *FsmStateReason
f := func() error {
nextState := bgp.FSMState(-1)
- switch fsm.state {
+ fsm.lock.RLock()
+ fsmState := fsm.state
+ fsm.lock.RUnlock()
+ switch fsmState {
case bgp.BGP_FSM_IDLE:
nextState, reason = h.idle()
// case bgp.BGP_FSM_CONNECT:
@@ -1661,6 +1836,7 @@ func (h *FSMHandler) loop() error {
nextState := <-ch
+ fsm.lock.RLock()
if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1685,6 +1861,7 @@ func (h *FSMHandler) loop() error {
"Reason": reason.String(),
}).Info("Peer Down")
}
+ fsm.lock.RUnlock()
e := time.AfterFunc(time.Second*120, func() {
log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState)
@@ -1694,6 +1871,7 @@ func (h *FSMHandler) loop() error {
// under zero means that tomb.Dying()
if nextState >= bgp.BGP_FSM_IDLE {
+ fsm.lock.RLock()
h.stateCh <- &FsmMsg{
MsgType: FSM_MSG_STATE_CHANGE,
MsgSrc: fsm.pConf.State.NeighborAddress,
@@ -1701,11 +1879,15 @@ func (h *FSMHandler) loop() error {
StateReason: reason,
Version: h.fsm.version,
}
+ fsm.lock.RUnlock()
}
return nil
}
func (h *FSMHandler) changeAdminState(s AdminState) error {
+ h.fsm.lock.Lock()
+ defer h.fsm.lock.Unlock()
+
fsm := h.fsm
if fsm.adminState != s {
log.WithFields(log.Fields{