diff options
author | jhserrano <jhserrano.github@gmail.com> | 2018-07-09 19:48:57 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-07-19 22:23:29 +0900 |
commit | fb999f325b7b160df371145eb6fb29fbb5c73f21 (patch) | |
tree | 40406cf792a56e41662b6d2e07dd8648064c5aa2 /pkg/server/fsm.go | |
parent | 695fb5298edf8f912a9d11922e96f23c6464ba58 (diff) |
fix races and enable race detector in unittest
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server/fsm.go')
-rw-r--r-- | pkg/server/fsm.go | 218 |
1 files changed, 200 insertions, 18 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index d0cc4f86..073cd88f 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "strconv" + "sync" "time" "github.com/eapache/channels" @@ -193,6 +194,7 @@ type FSM struct { t tomb.Tomb gConf *config.Global pConf *config.Neighbor + lock sync.RWMutex state bgp.FSMState reason *FsmStateReason conn net.Conn @@ -215,6 +217,8 @@ type FSM struct { } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { + fsm.lock.Lock() + defer fsm.lock.Unlock() state := &fsm.pConf.State.Messages timer := &fsm.pConf.Timers if isIn { @@ -264,6 +268,8 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) { + fsm.lock.Lock() + defer fsm.lock.Unlock() stats := &fsm.pConf.State.Messages.Received switch statType { // TODO @@ -305,6 +311,9 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP } func (fsm *FSM) StateChange(nextState bgp.FSMState) { + fsm.lock.Lock() + defer fsm.lock.Unlock() + log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -367,7 +376,11 @@ func (fsm *FSM) LocalHostPort() (string, uint16) { } func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) { - if fsm.h != nil && fsm.h.conn != nil { + fsm.lock.RLock() + established := fsm.h != nil && fsm.h.conn != nil + fsm.lock.RUnlock() + + if established { m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) b, _ := m.Serialize() _, err := fsm.h.conn.Write(b) @@ -392,7 +405,9 @@ func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) ( } func (fsm *FSM) connectLoop() error { + fsm.lock.RLock() tick := int(fsm.pConf.Timers.Config.ConnectRetry) + fsm.lock.RUnlock() if tick < MIN_CONNECT_RETRY { tick = MIN_CONNECT_RETRY } @@ -403,6 +418,9 @@ func (fsm *FSM) connectLoop() error { timer.Stop() connect := func() { + fsm.lock.RLock() + defer fsm.lock.RUnlock() + addr := fsm.pConf.State.NeighborAddress port := int(bgp.BGP_PORT) if fsm.pConf.Transport.Config.RemotePort != 0 { @@ -460,13 +478,18 @@ func (fsm *FSM) connectLoop() error { for { select { case <-fsm.t.Dying(): + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Debug("stop connect loop") + fsm.lock.RUnlock() return nil case <-timer.C: - if fsm.state == bgp.BGP_FSM_ACTIVE { + fsm.lock.RLock() + ready := fsm.state == bgp.BGP_FSM_ACTIVE + fsm.lock.RUnlock() + if ready { go connect() } case <-fsm.getActiveCh: @@ -504,18 +527,27 @@ func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *F func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + fsm.lock.RLock() idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime)) + fsm.lock.RUnlock() + for { select { case <-h.t.Dying(): return -1, NewFsmStateReason(FSM_DYING, nil, nil) case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case conn, ok := <-fsm.connCh: @@ -523,20 +555,28 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() + case <-idleHoldTimer.C: + fsm.lock.RLock() + adminStateUp := fsm.adminState == ADMIN_STATE_UP + fsm.lock.RUnlock() - if fsm.adminState == ADMIN_STATE_UP { + if adminStateUp { + fsm.lock.Lock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "Duration": fsm.idleHoldTime, }).Debug("IdleHoldTimer expired") fsm.idleHoldTime = HOLDTIME_IDLE + fsm.lock.Unlock() return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil) } else { @@ -553,7 +593,9 @@ func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { case ADMIN_STATE_UP: // restart idle hold timer + fsm.lock.RLock() idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime)) + fsm.lock.RUnlock() } } } @@ -570,9 +612,13 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { if !ok { break } + fsm.lock.Lock() fsm.conn = conn + fsm.lock.Unlock() ttl := 0 ttlMin := 0 + + fsm.lock.RLock() if fsm.pConf.TtlSecurity.Config.Enabled { ttl = 255 ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin) @@ -605,16 +651,22 @@ func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { }).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err) } } + fsm.lock.RUnlock() // we don't implement delayed open timer so move to opensent right // away. return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil) case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case err := <-h.stateReasonCh: @@ -791,6 +843,9 @@ func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily { } func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string { + h.fsm.lock.Lock() + defer h.fsm.lock.Unlock() + n := bgp.AddressFamilyNameMap[rf] for i, a := range h.fsm.pConf.AfiSafis { @@ -817,36 +872,44 @@ func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError b handling = factor.ErrorHandling switch handling { case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD: + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("Some attributes were discarded") + h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW: m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate)) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("the received Update message was treated as withdraw") + h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_AFISAFI_DISABLE: rf := extractRouteFamily(factor.ErrorAttribute) if rf == nil { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), }).Warn("Error occurred during AFI/SAFI disabling") + h.fsm.lock.RUnlock() } else { n := h.afiSafiDisable(*rf) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warnf("Capability %s was disabled", n) + h.fsm.lock.RUnlock() } } } else { @@ -874,6 +937,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { err = hd.DecodeFromBytes(headerBuf) if err != nil { h.fsm.bgpMessageStateUpdate(0, true) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -886,6 +950,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { MsgData: err, Version: h.fsm.version, } + h.fsm.lock.RUnlock() return fmsg, err } @@ -896,10 +961,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { } now := time.Now() - useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw handling := bgp.ERROR_HANDLING_NONE - m, err := bgp.ParseBGPBody(hd, bodyBuf, h.fsm.marshallingOptions) + h.fsm.lock.RLock() + useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw + options := h.fsm.marshallingOptions + h.fsm.lock.RUnlock() + + m, err := bgp.ParseBGPBody(hd, bodyBuf, options) if err != nil { handling = h.handlingError(m, err, useRevisedError) h.fsm.bgpMessageStateUpdate(0, true) @@ -907,29 +976,38 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { h.fsm.bgpMessageStateUpdate(m.Header.Type, true) err = bgp.ValidateBGPMessage(m) } + h.fsm.lock.RLock() fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.State.NeighborAddress, timestamp: now, Version: h.fsm.version, } + h.fsm.lock.RUnlock() switch handling { case bgp.ERROR_HANDLING_AFISAFI_DISABLE: fmsg.MsgData = m return fmsg, nil case bgp.ERROR_HANDLING_SESSION_RESET: + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP message") + h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err default: fmsg.MsgData = m - if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + + h.fsm.lock.RLock() + establishedState := h.fsm.state == bgp.BGP_FSM_ESTABLISHED + h.fsm.lock.RUnlock() + + if establishedState { switch m.Header.Type { case bgp.BGP_MSG_ROUTE_REFRESH: fmsg.MsgType = FSM_MSG_ROUTE_REFRESH @@ -942,17 +1020,22 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { copy(fmsg.payload, headerBuf) copy(fmsg.payload[len(headerBuf):], bodyBuf) - ok, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, isEBGP, isConfed) + h.fsm.lock.RLock() + rfMap := h.fsm.rfMap + h.fsm.lock.RUnlock() + ok, err := bgp.ValidateUpdateMsg(body, rfMap, isEBGP, isConfed) if !ok { handling = h.handlingError(m, err, useRevisedError) } if handling == bgp.ERROR_HANDLING_SESSION_RESET { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP update message") + h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err } @@ -974,7 +1057,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { return fmsg, err } - fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp) + h.fsm.lock.RLock() + peerInfo := h.fsm.peerInfo + h.fsm.lock.RUnlock() + fmsg.PathList = table.ProcessMessage(m, peerInfo, fmsg.timestamp) fallthrough case bgp.BGP_MSG_KEEPALIVE: // if the length of h.holdTimerResetCh @@ -991,6 +1077,7 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -999,7 +1086,9 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { "Communicated-Reason": communication, "Data": rest, }).Warn("received notification") + h.fsm.lock.RUnlock() } else { + h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, @@ -1007,9 +1096,14 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("received notification") + h.fsm.lock.RUnlock() } - if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET { + h.fsm.lock.RLock() + s := h.fsm.pConf.GracefulRestart.State + hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET + h.fsm.lock.RUnlock() + if hardReset { sendToStateReasonCh(FSM_HARD_RESET, m) } else { sendToStateReasonCh(FSM_NOTIFICATION_RECV, m) @@ -1089,13 +1183,20 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode] func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + + fsm.lock.RLock() m := buildopen(fsm.gConf, fsm.pConf) + fsm.lock.RUnlock() + b, _ := m.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) h.msgCh = channels.NewInfiniteChannel() + + fsm.lock.RLock() h.conn = fsm.conn + fsm.lock.RUnlock() h.t.Go(h.recvMessage) @@ -1103,7 +1204,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { // sets its HoldTimer to a large value // A HoldTimer value of 4 minutes is suggested as a "large value" // for the HoldTimer + fsm.lock.RLock() holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) + fsm.lock.RUnlock() for { select { @@ -1115,18 +1218,25 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } @@ -1139,16 +1249,27 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_OPEN { + fsm.lock.Lock() fsm.recvOpen = m + fsm.lock.Unlock() + body := m.Body.(*bgp.BGPOpen) - peerAs, err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs) + + fsm.lock.RLock() + fsmPeerAS := fsm.pConf.Config.PeerAs + fsm.lock.RUnlock() + peerAs, err := bgp.ValidateOpenMsg(body, fsmPeerAS) if err != nil { m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) } // ASN negotiation was skipped - if fsm.pConf.Config.PeerAs == 0 { + fsm.lock.RLock() + asnNegotiationSkipped := fsm.pConf.Config.PeerAs == 0 + fsm.lock.RUnlock() + if asnNegotiationSkipped { + fsm.lock.Lock() typ := config.PEER_TYPE_EXTERNAL if fsm.peerInfo.LocalAS == peerAs { typ = config.PEER_TYPE_INTERNAL @@ -1159,9 +1280,13 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ) + fsm.lock.Unlock() } else { + fsm.lock.Lock() fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType + fsm.lock.Unlock() } + fsm.lock.Lock() fsm.pConf.State.PeerAs = peerAs fsm.peerInfo.AS = peerAs fsm.peerInfo.ID = body.ID @@ -1264,6 +1389,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { } } + fsm.lock.Unlock() msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.conn.Write(b) @@ -1313,6 +1439,9 @@ func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { } func keepaliveTicker(fsm *FSM) *time.Ticker { + fsm.lock.RLock() + defer fsm.lock.RUnlock() + negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime if negotiatedTime == 0 { return &time.Ticker{} @@ -1328,6 +1457,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) h.msgCh = channels.NewInfiniteChannel() + fsm.lock.RLock() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -1340,6 +1470,7 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { // sets the HoldTimer according to the negotiated value holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } + fsm.lock.RUnlock() for { select { @@ -1351,18 +1482,25 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: - if fsm.pConf.GracefulRestart.State.PeerRestarting { + fsm.lock.RLock() + restarting := fsm.pConf.GracefulRestart.State.PeerRestarting + fsm.lock.RUnlock() + if restarting { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") + fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } @@ -1429,6 +1567,7 @@ func (h *FSMHandler) sendMessageloop() error { fsm := h.fsm ticker := keepaliveTicker(fsm) send := func(m *bgp.BGPMessage) error { + fsm.lock.RLock() if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE { log.WithFields(log.Fields{ "Topic": "Peer", @@ -1440,29 +1579,37 @@ func (h *FSMHandler) sendMessageloop() error { table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) } b, err := m.Serialize(h.fsm.marshallingOptions) + fsm.lock.RUnlock() if err != nil { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to serialize") + fsm.lock.RUnlock() fsm.bgpMessageStateUpdate(0, false) return nil } - if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil { + fsm.lock.RLock() + err = conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))) + fsm.lock.RUnlock() + if err != nil { h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("failed to set write deadline") } _, err = conn.Write(b) if err != nil { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to send") + fsm.lock.RUnlock() h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("closed") @@ -1474,6 +1621,7 @@ func (h *FSMHandler) sendMessageloop() error { body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1483,7 +1631,9 @@ func (h *FSMHandler) sendMessageloop() error { "Communicated-Reason": communication, "Data": rest, }).Warn("sent notification") + fsm.lock.RUnlock() } else { + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1492,12 +1642,14 @@ func (h *FSMHandler) sendMessageloop() error { "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("sent notification") + fsm.lock.RUnlock() } h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil) conn.Close() return fmt.Errorf("closed") case bgp.BGP_MSG_UPDATE: update := m.Body.(*bgp.BGPUpdate) + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, @@ -1506,13 +1658,16 @@ func (h *FSMHandler) sendMessageloop() error { "withdrawals": update.WithdrawnRoutes, "attributes": update.PathAttributes, }).Debug("sent update") + fsm.lock.RUnlock() default: + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "data": m, }).Debug("sent") + fsm.lock.RUnlock() } return nil } @@ -1523,7 +1678,10 @@ func (h *FSMHandler) sendMessageloop() error { return nil case o := <-h.outgoing.Out(): m := o.(*FsmOutgoingMsg) - for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, h.fsm.marshallingOptions) { + h.fsm.lock.RLock() + options := h.fsm.marshallingOptions + h.fsm.lock.RUnlock() + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { if err := send(msg); err != nil { return nil } @@ -1560,7 +1718,9 @@ func (h *FSMHandler) recvMessageloop() error { func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm + fsm.lock.Lock() h.conn = fsm.conn + fsm.lock.Unlock() h.t.Go(h.sendMessageloop) h.msgCh = h.incoming h.t.Go(h.recvMessageloop) @@ -1569,7 +1729,9 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { + fsm.lock.RLock() holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) + fsm.lock.RUnlock() } fsm.gracefulRestartTimer.Stop() @@ -1583,14 +1745,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { break } conn.Close() + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") + fsm.lock.RUnlock() case err := <-h.stateReasonCh: h.conn.Close() h.t.Kill(nil) + fsm.lock.RLock() if s := fsm.pConf.GracefulRestart.State; s.Enabled && (s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV || err.Type == FSM_READ_FAILED || @@ -1603,20 +1768,25 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { }).Info("peer graceful restart") fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second) } + fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: + fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("hold timer expired") + fsm.lock.RUnlock() m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) case <-h.holdTimerResetCh: + fsm.lock.RLock() if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } + fsm.lock.RUnlock() case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { @@ -1633,12 +1803,17 @@ func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { func (h *FSMHandler) loop() error { fsm := h.fsm ch := make(chan bgp.FSMState) + fsm.lock.RLock() oldState := fsm.state + fsm.lock.RUnlock() var reason *FsmStateReason f := func() error { nextState := bgp.FSMState(-1) - switch fsm.state { + fsm.lock.RLock() + fsmState := fsm.state + fsm.lock.RUnlock() + switch fsmState { case bgp.BGP_FSM_IDLE: nextState, reason = h.idle() // case bgp.BGP_FSM_CONNECT: @@ -1661,6 +1836,7 @@ func (h *FSMHandler) loop() error { nextState := <-ch + fsm.lock.RLock() if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { log.WithFields(log.Fields{ "Topic": "Peer", @@ -1685,6 +1861,7 @@ func (h *FSMHandler) loop() error { "Reason": reason.String(), }).Info("Peer Down") } + fsm.lock.RUnlock() e := time.AfterFunc(time.Second*120, func() { log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState) @@ -1694,6 +1871,7 @@ func (h *FSMHandler) loop() error { // under zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { + fsm.lock.RLock() h.stateCh <- &FsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, MsgSrc: fsm.pConf.State.NeighborAddress, @@ -1701,11 +1879,15 @@ func (h *FSMHandler) loop() error { StateReason: reason, Version: h.fsm.version, } + fsm.lock.RUnlock() } return nil } func (h *FSMHandler) changeAdminState(s AdminState) error { + h.fsm.lock.Lock() + defer h.fsm.lock.Unlock() + fsm := h.fsm if fsm.adminState != s { log.WithFields(log.Fields{ |