diff options
-rw-r--r-- | server/fsm.go | 53 | ||||
-rw-r--r-- | server/peer.go | 55 |
2 files changed, 64 insertions, 44 deletions
diff --git a/server/fsm.go b/server/fsm.go index 3080dca4..21c05399 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -24,16 +24,28 @@ import ( "time" ) +type fsmMsgType int + +const ( + _ fsmMsgType = iota + FSM_MSG_STATE_CHANGE + FSM_MSG_BGP_MESSAGE +) + +type fsmMsg struct { + MsgType fsmMsgType + MsgData interface{} +} + type FSM struct { globalConfig *config.GlobalType peerConfig *config.NeighborType keepaliveTicker *time.Ticker state bgp.FSMState - incoming chan *bgp.BGPMessage + incoming chan *fsmMsg outgoing chan *bgp.BGPMessage passiveConn *net.TCPConn passiveConnCh chan *net.TCPConn - stateCh chan bgp.FSMState } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -78,7 +90,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *bgp.BGPMessage, outgoing chan *bgp.BGPMessage) *FSM { +func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSM { return &FSM{ globalConfig: gConfig, peerConfig: pConfig, @@ -86,14 +98,9 @@ func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh cha outgoing: outgoing, state: bgp.BGP_FSM_IDLE, passiveConnCh: connCh, - stateCh: make(chan bgp.FSMState), } } -func (fsm *FSM) StateChanged() chan bgp.FSMState { - return fsm.stateCh -} - func (fsm *FSM) StateChange(nextState bgp.FSMState) { log.Debugf("Peer (%v) state changed from %v to %v", fsm.peerConfig.NeighborAddress, fsm.state, nextState) fsm.state = nextState @@ -103,7 +110,7 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn *net.TCPConn - msgCh chan *bgp.BGPMessage + msgCh chan *fsmMsg errorCh chan bool } @@ -208,7 +215,11 @@ func (h *FSMHandler) recvMessageWithError() error { h.errorCh <- true return err } - h.msgCh <- m + e := &fsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgData: m, + } + h.msgCh <- e return nil } @@ -224,7 +235,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { fsm.passiveConn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *fsmMsg) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -234,10 +245,15 @@ func (h *FSMHandler) opensent() bgp.FSMState { case <-h.t.Dying(): fsm.passiveConn.Close() return 0 - case m := <-h.msgCh: + case e := <-h.msgCh: + m := e.MsgData.(*bgp.BGPMessage) fsm.bgpMessageStateUpdate(m.Header.Type, true) if m.Header.Type == bgp.BGP_MSG_OPEN { - fsm.incoming <- m + e := &fsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgData: m, + } + fsm.incoming <- e msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.passiveConn.Write(b) @@ -256,7 +272,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval) fsm.keepaliveTicker = time.NewTicker(sec) - h.msgCh = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *fsmMsg) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -271,7 +287,8 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { b, _ := m.Serialize() // TODO: check error fsm.passiveConn.Write(b) - case m := <-h.msgCh: + case e := <-h.msgCh: + m := e.MsgData.(*bgp.BGPMessage) nextState := bgp.BGP_FSM_IDLE fsm.bgpMessageStateUpdate(m.Header.Type, true) if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { @@ -366,7 +383,11 @@ func (h *FSMHandler) loop() error { // zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { - fsm.stateCh <- nextState + e := &fsmMsg{ + MsgType: FSM_MSG_STATE_CHANGE, + MsgData: nextState, + } + fsm.incoming <- e } return nil } diff --git a/server/peer.go b/server/peer.go index 88a327e7..6f18f4ec 100644 --- a/server/peer.go +++ b/server/peer.go @@ -32,7 +32,7 @@ type Peer struct { globalConfig config.GlobalType peerConfig config.NeighborType acceptedConnCh chan *net.TCPConn - incoming chan *bgp.BGPMessage + incoming chan *fsmMsg outgoing chan *bgp.BGPMessage inEventCh chan *message outEventCh chan *message @@ -52,7 +52,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *mes globalConfig: g, peerConfig: peer, acceptedConnCh: make(chan *net.TCPConn), - incoming: make(chan *bgp.BGPMessage, 4096), + incoming: make(chan *fsmMsg, 4096), outgoing: make(chan *bgp.BGPMessage, 4096), inEventCh: make(chan *message, 4096), outEventCh: outEventCh, @@ -114,8 +114,6 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { func (peer *Peer) sendMessages(msgs []*bgp.BGPMessage) { for _, m := range msgs { - // FIXME: there is race where state change - // (established) event arrived before open message if peer.peerConfig.BgpNeighborCommonState.State != uint32(bgp.BGP_FSM_ESTABLISHED) { continue } @@ -175,37 +173,38 @@ func (peer *Peer) loop() error { sameState := true for sameState { select { - case nextState := <-peer.fsm.StateChanged(): - // waits for all goroutines created for the current state - h.Wait() - oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) - peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState) - peer.fsm.StateChange(nextState) - sameState = false - if nextState == bgp.BGP_FSM_ESTABLISHED { - pathList := peer.adjRib.GetOutPathList(peer.rf) - peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) - peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now() - peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ - if oldState >= bgp.BGP_FSM_OPENSENT { - peer.peerInfo.VersionNum++ - } - } - if oldState == bgp.BGP_FSM_ESTABLISHED { - peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{} - peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo) - } case <-peer.t.Dying(): close(peer.acceptedConnCh) h.Stop() close(peer.incoming) close(peer.outgoing) return nil - case m := <-peer.incoming: - if m == nil { - continue + case e := <-peer.incoming: + switch e.MsgType { + case FSM_MSG_STATE_CHANGE: + nextState := e.MsgData.(bgp.FSMState) + // waits for all goroutines created for the current state + h.Wait() + oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) + peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState) + peer.fsm.StateChange(nextState) + sameState = false + if nextState == bgp.BGP_FSM_ESTABLISHED { + pathList := peer.adjRib.GetOutPathList(peer.rf) + peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) + peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now() + peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ + if oldState >= bgp.BGP_FSM_OPENSENT { + peer.peerInfo.VersionNum++ + } + } + if oldState == bgp.BGP_FSM_ESTABLISHED { + peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{} + peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo) + } + case FSM_MSG_BGP_MESSAGE: + peer.handleBGPmessage(e.MsgData.(*bgp.BGPMessage)) } - peer.handleBGPmessage(m) case m := <-peer.inEventCh: peer.handlePeermessage(m) } |