diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-03 13:09:35 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-03 13:09:35 +0900 |
commit | f62b27837e98501743aff07b62cf5ae46804bdd0 (patch) | |
tree | 2b5177141527cc665059b3d7e9f18f9de8695f6b /server | |
parent | 10dfd44a53a3453b4ab83b39bfb5b759b9d43a47 (diff) |
server: merge eventCh and incoming channels
We need the order of events on two channles so this patch merges two
channels. For example, we need to get received OpenMessage before
moving to establish state.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/fsm.go | 53 | ||||
-rw-r--r-- | server/peer.go | 55 |
2 files changed, 64 insertions, 44 deletions
diff --git a/server/fsm.go b/server/fsm.go index 3080dca4..21c05399 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -24,16 +24,28 @@ import ( "time" ) +type fsmMsgType int + +const ( + _ fsmMsgType = iota + FSM_MSG_STATE_CHANGE + FSM_MSG_BGP_MESSAGE +) + +type fsmMsg struct { + MsgType fsmMsgType + MsgData interface{} +} + type FSM struct { globalConfig *config.GlobalType peerConfig *config.NeighborType keepaliveTicker *time.Ticker state bgp.FSMState - incoming chan *bgp.BGPMessage + incoming chan *fsmMsg outgoing chan *bgp.BGPMessage passiveConn *net.TCPConn passiveConnCh chan *net.TCPConn - stateCh chan bgp.FSMState } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -78,7 +90,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *bgp.BGPMessage, outgoing chan *bgp.BGPMessage) *FSM { +func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSM { return &FSM{ globalConfig: gConfig, peerConfig: pConfig, @@ -86,14 +98,9 @@ func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh cha outgoing: outgoing, state: bgp.BGP_FSM_IDLE, passiveConnCh: connCh, - stateCh: make(chan bgp.FSMState), } } -func (fsm *FSM) StateChanged() chan bgp.FSMState { - return fsm.stateCh -} - func (fsm *FSM) StateChange(nextState bgp.FSMState) { log.Debugf("Peer (%v) state changed from %v to %v", fsm.peerConfig.NeighborAddress, fsm.state, nextState) fsm.state = nextState @@ -103,7 +110,7 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn *net.TCPConn - msgCh chan *bgp.BGPMessage + msgCh chan *fsmMsg errorCh chan bool } @@ -208,7 +215,11 @@ func (h *FSMHandler) recvMessageWithError() error { h.errorCh <- true return err } - h.msgCh <- m + e := &fsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgData: m, + } + h.msgCh <- e return nil } @@ -224,7 +235,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { fsm.passiveConn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *fsmMsg) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -234,10 +245,15 @@ func (h *FSMHandler) opensent() bgp.FSMState { case <-h.t.Dying(): fsm.passiveConn.Close() return 0 - case m := <-h.msgCh: + case e := <-h.msgCh: + m := e.MsgData.(*bgp.BGPMessage) fsm.bgpMessageStateUpdate(m.Header.Type, true) if m.Header.Type == bgp.BGP_MSG_OPEN { - fsm.incoming <- m + e := &fsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgData: m, + } + fsm.incoming <- e msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.passiveConn.Write(b) @@ -256,7 +272,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval) fsm.keepaliveTicker = time.NewTicker(sec) - h.msgCh = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *fsmMsg) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -271,7 +287,8 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { b, _ := m.Serialize() // TODO: check error fsm.passiveConn.Write(b) - case m := <-h.msgCh: + case e := <-h.msgCh: + m := e.MsgData.(*bgp.BGPMessage) nextState := bgp.BGP_FSM_IDLE fsm.bgpMessageStateUpdate(m.Header.Type, true) if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { @@ -366,7 +383,11 @@ func (h *FSMHandler) loop() error { // zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { - fsm.stateCh <- nextState + e := &fsmMsg{ + MsgType: FSM_MSG_STATE_CHANGE, + MsgData: nextState, + } + fsm.incoming <- e } return nil } diff --git a/server/peer.go b/server/peer.go index 88a327e7..6f18f4ec 100644 --- a/server/peer.go +++ b/server/peer.go @@ -32,7 +32,7 @@ type Peer struct { globalConfig config.GlobalType peerConfig config.NeighborType acceptedConnCh chan *net.TCPConn - incoming chan *bgp.BGPMessage + incoming chan *fsmMsg outgoing chan *bgp.BGPMessage inEventCh chan *message outEventCh chan *message @@ -52,7 +52,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *mes globalConfig: g, peerConfig: peer, acceptedConnCh: make(chan *net.TCPConn), - incoming: make(chan *bgp.BGPMessage, 4096), + incoming: make(chan *fsmMsg, 4096), outgoing: make(chan *bgp.BGPMessage, 4096), inEventCh: make(chan *message, 4096), outEventCh: outEventCh, @@ -114,8 +114,6 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { func (peer *Peer) sendMessages(msgs []*bgp.BGPMessage) { for _, m := range msgs { - // FIXME: there is race where state change - // (established) event arrived before open message if peer.peerConfig.BgpNeighborCommonState.State != uint32(bgp.BGP_FSM_ESTABLISHED) { continue } @@ -175,37 +173,38 @@ func (peer *Peer) loop() error { sameState := true for sameState { select { - case nextState := <-peer.fsm.StateChanged(): - // waits for all goroutines created for the current state - h.Wait() - oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) - peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState) - peer.fsm.StateChange(nextState) - sameState = false - if nextState == bgp.BGP_FSM_ESTABLISHED { - pathList := peer.adjRib.GetOutPathList(peer.rf) - peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) - peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now() - peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ - if oldState >= bgp.BGP_FSM_OPENSENT { - peer.peerInfo.VersionNum++ - } - } - if oldState == bgp.BGP_FSM_ESTABLISHED { - peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{} - peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo) - } case <-peer.t.Dying(): close(peer.acceptedConnCh) h.Stop() close(peer.incoming) close(peer.outgoing) return nil - case m := <-peer.incoming: - if m == nil { - continue + case e := <-peer.incoming: + switch e.MsgType { + case FSM_MSG_STATE_CHANGE: + nextState := e.MsgData.(bgp.FSMState) + // waits for all goroutines created for the current state + h.Wait() + oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) + peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState) + peer.fsm.StateChange(nextState) + sameState = false + if nextState == bgp.BGP_FSM_ESTABLISHED { + pathList := peer.adjRib.GetOutPathList(peer.rf) + peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) + peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now() + peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ + if oldState >= bgp.BGP_FSM_OPENSENT { + peer.peerInfo.VersionNum++ + } + } + if oldState == bgp.BGP_FSM_ESTABLISHED { + peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{} + peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo) + } + case FSM_MSG_BGP_MESSAGE: + peer.handleBGPmessage(e.MsgData.(*bgp.BGPMessage)) } - peer.handleBGPmessage(m) case m := <-peer.inEventCh: peer.handlePeermessage(m) } |