summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/fsm.go53
-rw-r--r--server/peer.go55
2 files changed, 64 insertions, 44 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 3080dca4..21c05399 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -24,16 +24,28 @@ import (
"time"
)
+type fsmMsgType int
+
+const (
+ _ fsmMsgType = iota
+ FSM_MSG_STATE_CHANGE
+ FSM_MSG_BGP_MESSAGE
+)
+
+type fsmMsg struct {
+ MsgType fsmMsgType
+ MsgData interface{}
+}
+
type FSM struct {
globalConfig *config.GlobalType
peerConfig *config.NeighborType
keepaliveTicker *time.Ticker
state bgp.FSMState
- incoming chan *bgp.BGPMessage
+ incoming chan *fsmMsg
outgoing chan *bgp.BGPMessage
passiveConn *net.TCPConn
passiveConnCh chan *net.TCPConn
- stateCh chan bgp.FSMState
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
@@ -78,7 +90,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
}
-func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *bgp.BGPMessage, outgoing chan *bgp.BGPMessage) *FSM {
+func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSM {
return &FSM{
globalConfig: gConfig,
peerConfig: pConfig,
@@ -86,14 +98,9 @@ func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh cha
outgoing: outgoing,
state: bgp.BGP_FSM_IDLE,
passiveConnCh: connCh,
- stateCh: make(chan bgp.FSMState),
}
}
-func (fsm *FSM) StateChanged() chan bgp.FSMState {
- return fsm.stateCh
-}
-
func (fsm *FSM) StateChange(nextState bgp.FSMState) {
log.Debugf("Peer (%v) state changed from %v to %v", fsm.peerConfig.NeighborAddress, fsm.state, nextState)
fsm.state = nextState
@@ -103,7 +110,7 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn *net.TCPConn
- msgCh chan *bgp.BGPMessage
+ msgCh chan *fsmMsg
errorCh chan bool
}
@@ -208,7 +215,11 @@ func (h *FSMHandler) recvMessageWithError() error {
h.errorCh <- true
return err
}
- h.msgCh <- m
+ e := &fsmMsg{
+ MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgData: m,
+ }
+ h.msgCh <- e
return nil
}
@@ -224,7 +235,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
fsm.passiveConn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.msgCh = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *fsmMsg)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -234,10 +245,15 @@ func (h *FSMHandler) opensent() bgp.FSMState {
case <-h.t.Dying():
fsm.passiveConn.Close()
return 0
- case m := <-h.msgCh:
+ case e := <-h.msgCh:
+ m := e.MsgData.(*bgp.BGPMessage)
fsm.bgpMessageStateUpdate(m.Header.Type, true)
if m.Header.Type == bgp.BGP_MSG_OPEN {
- fsm.incoming <- m
+ e := &fsmMsg{
+ MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgData: m,
+ }
+ fsm.incoming <- e
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
fsm.passiveConn.Write(b)
@@ -256,7 +272,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval)
fsm.keepaliveTicker = time.NewTicker(sec)
- h.msgCh = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *fsmMsg)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -271,7 +287,8 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
b, _ := m.Serialize()
// TODO: check error
fsm.passiveConn.Write(b)
- case m := <-h.msgCh:
+ case e := <-h.msgCh:
+ m := e.MsgData.(*bgp.BGPMessage)
nextState := bgp.BGP_FSM_IDLE
fsm.bgpMessageStateUpdate(m.Header.Type, true)
if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
@@ -366,7 +383,11 @@ func (h *FSMHandler) loop() error {
// zero means that tomb.Dying()
if nextState >= bgp.BGP_FSM_IDLE {
- fsm.stateCh <- nextState
+ e := &fsmMsg{
+ MsgType: FSM_MSG_STATE_CHANGE,
+ MsgData: nextState,
+ }
+ fsm.incoming <- e
}
return nil
}
diff --git a/server/peer.go b/server/peer.go
index 88a327e7..6f18f4ec 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -32,7 +32,7 @@ type Peer struct {
globalConfig config.GlobalType
peerConfig config.NeighborType
acceptedConnCh chan *net.TCPConn
- incoming chan *bgp.BGPMessage
+ incoming chan *fsmMsg
outgoing chan *bgp.BGPMessage
inEventCh chan *message
outEventCh chan *message
@@ -52,7 +52,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *mes
globalConfig: g,
peerConfig: peer,
acceptedConnCh: make(chan *net.TCPConn),
- incoming: make(chan *bgp.BGPMessage, 4096),
+ incoming: make(chan *fsmMsg, 4096),
outgoing: make(chan *bgp.BGPMessage, 4096),
inEventCh: make(chan *message, 4096),
outEventCh: outEventCh,
@@ -114,8 +114,6 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) {
func (peer *Peer) sendMessages(msgs []*bgp.BGPMessage) {
for _, m := range msgs {
- // FIXME: there is race where state change
- // (established) event arrived before open message
if peer.peerConfig.BgpNeighborCommonState.State != uint32(bgp.BGP_FSM_ESTABLISHED) {
continue
}
@@ -175,37 +173,38 @@ func (peer *Peer) loop() error {
sameState := true
for sameState {
select {
- case nextState := <-peer.fsm.StateChanged():
- // waits for all goroutines created for the current state
- h.Wait()
- oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State)
- peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState)
- peer.fsm.StateChange(nextState)
- sameState = false
- if nextState == bgp.BGP_FSM_ESTABLISHED {
- pathList := peer.adjRib.GetOutPathList(peer.rf)
- peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
- peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now()
- peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
- if oldState >= bgp.BGP_FSM_OPENSENT {
- peer.peerInfo.VersionNum++
- }
- }
- if oldState == bgp.BGP_FSM_ESTABLISHED {
- peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{}
- peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo)
- }
case <-peer.t.Dying():
close(peer.acceptedConnCh)
h.Stop()
close(peer.incoming)
close(peer.outgoing)
return nil
- case m := <-peer.incoming:
- if m == nil {
- continue
+ case e := <-peer.incoming:
+ switch e.MsgType {
+ case FSM_MSG_STATE_CHANGE:
+ nextState := e.MsgData.(bgp.FSMState)
+ // waits for all goroutines created for the current state
+ h.Wait()
+ oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State)
+ peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState)
+ peer.fsm.StateChange(nextState)
+ sameState = false
+ if nextState == bgp.BGP_FSM_ESTABLISHED {
+ pathList := peer.adjRib.GetOutPathList(peer.rf)
+ peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
+ peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now()
+ peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
+ if oldState >= bgp.BGP_FSM_OPENSENT {
+ peer.peerInfo.VersionNum++
+ }
+ }
+ if oldState == bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{}
+ peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo)
+ }
+ case FSM_MSG_BGP_MESSAGE:
+ peer.handleBGPmessage(e.MsgData.(*bgp.BGPMessage))
}
- peer.handleBGPmessage(m)
case m := <-peer.inEventCh:
peer.handlePeermessage(m)
}