diff options
author | FUJITA Tomonori <fujita.tomonori@gmail.com> | 2019-03-23 20:48:18 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@gmail.com> | 2019-03-24 20:34:21 +0900 |
commit | c6aef93ef141b7eba7d682bf3e52cc82b41b926b (patch) | |
tree | 4a32f87754e7acea27862a593eade2d9d6ddc201 /pkg/server/fsm.go | |
parent | ddf9e5572f144ebac031e4128c8e3432470c6ef7 (diff) |
fix race of peer deletion
Fixed a race bug that causes the unittest failure. Also fixed
StopBgp() to block until all the peers are deleted cleanly.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@gmail.com>
Diffstat (limited to 'pkg/server/fsm.go')
-rw-r--r-- | pkg/server/fsm.go | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index 6eadfa92..e66054c7 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -123,13 +123,13 @@ const ( type fsmMsg struct { MsgType fsmMsgType + fsm *fsm MsgSrc string MsgData interface{} StateReason *fsmStateReason PathList []*table.Path timestamp time.Time payload []byte - Version uint } type fsmOutgoingMsg struct { @@ -169,13 +169,13 @@ type adminStateOperation struct { Communication []byte } -var fsmVersion uint - type fsm struct { gConf *config.Global pConf *config.Neighbor lock sync.RWMutex state bgp.FSMState + outgoingCh *channels.InfiniteChannel + incomingCh *channels.InfiniteChannel reason *fsmStateReason conn net.Conn connCh chan net.Conn @@ -191,8 +191,8 @@ type fsm struct { policy *table.RoutingPolicy gracefulRestartTimer *time.Timer twoByteAsTrans bool - version uint marshallingOptions *bgp.MarshallingOption + notification chan *bgp.BGPMessage } func (fsm *fsm) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -267,11 +267,12 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP } pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)] pConf.Timers.State.Downtime = time.Now().Unix() - fsmVersion++ fsm := &fsm{ gConf: gConf, pConf: pConf, state: bgp.BGP_FSM_IDLE, + outgoingCh: channels.NewInfiniteChannel(), + incomingCh: channels.NewInfiniteChannel(), connCh: make(chan net.Conn, 1), opensentHoldTime: float64(holdtimeOpensent), adminState: adminState, @@ -281,7 +282,7 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP peerInfo: table.NewPeerInfo(gConf, pConf), policy: policy, gracefulRestartTimer: time.NewTimer(time.Hour), - version: fsmVersion, + notification: make(chan *bgp.BGPMessage, 1), } fsm.gracefulRestartTimer.Stop() return fsm @@ -382,7 +383,6 @@ type fsmHandler struct { msgCh *channels.InfiniteChannel stateReasonCh chan fsmStateReason incoming *channels.InfiniteChannel - stateCh chan *fsmMsg outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification *bgp.BGPMessage @@ -391,13 +391,12 @@ type fsmHandler struct { wg *sync.WaitGroup } -func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler { +func newFSMHandler(fsm *fsm, outgoing *channels.InfiniteChannel) *fsmHandler { ctx, cancel := context.WithCancel(context.Background()) h := &fsmHandler{ fsm: fsm, stateReasonCh: make(chan fsmStateReason, 2), - incoming: incoming, - stateCh: stateCh, + incoming: fsm.incomingCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), wg: &sync.WaitGroup{}, @@ -944,10 +943,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { "error": err, }).Warn("Session will be reset due to malformed BGP Header") fmsg := &fsmMsg{ + fsm: h.fsm, MsgType: fsmMsgBGPMessage, MsgSrc: h.fsm.pConf.State.NeighborAddress, MsgData: err, - Version: h.fsm.version, } h.fsm.lock.RUnlock() return fmsg, err @@ -977,10 +976,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { } h.fsm.lock.RLock() fmsg := &fsmMsg{ + fsm: h.fsm, MsgType: fsmMsgBGPMessage, MsgSrc: h.fsm.pConf.State.NeighborAddress, timestamp: now, - Version: h.fsm.version, } h.fsm.lock.RUnlock() @@ -1757,6 +1756,13 @@ func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateRe for { select { case <-ctx.Done(): + select { + case m := <-fsm.notification: + b, _ := m.Serialize(h.fsm.marshallingOptions) + h.conn.Write(b) + default: + // nothing to do + } h.conn.Close() return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: @@ -1881,18 +1887,15 @@ func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error { } fsm.lock.RUnlock() - // under zero means that the context was canceled. - if nextState >= bgp.BGP_FSM_IDLE { - fsm.lock.RLock() - h.stateCh <- &fsmMsg{ - MsgType: fsmMsgStateChange, - MsgSrc: fsm.pConf.State.NeighborAddress, - MsgData: nextState, - StateReason: reason, - Version: h.fsm.version, - } - fsm.lock.RUnlock() + fsm.lock.RLock() + h.incoming.In() <- &fsmMsg{ + fsm: fsm, + MsgType: fsmMsgStateChange, + MsgSrc: fsm.pConf.State.NeighborAddress, + MsgData: nextState, + StateReason: reason, } + fsm.lock.RUnlock() return nil } |