summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-01-03 13:09:35 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-01-03 13:09:35 +0900
commitf62b27837e98501743aff07b62cf5ae46804bdd0 (patch)
tree2b5177141527cc665059b3d7e9f18f9de8695f6b /server
parent10dfd44a53a3453b4ab83b39bfb5b759b9d43a47 (diff)
server: merge eventCh and incoming channels
We need the order of events on two channles so this patch merges two channels. For example, we need to get received OpenMessage before moving to establish state. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/fsm.go53
-rw-r--r--server/peer.go55
2 files changed, 64 insertions, 44 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 3080dca4..21c05399 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -24,16 +24,28 @@ import (
"time"
)
+type fsmMsgType int
+
+const (
+ _ fsmMsgType = iota
+ FSM_MSG_STATE_CHANGE
+ FSM_MSG_BGP_MESSAGE
+)
+
+type fsmMsg struct {
+ MsgType fsmMsgType
+ MsgData interface{}
+}
+
type FSM struct {
globalConfig *config.GlobalType
peerConfig *config.NeighborType
keepaliveTicker *time.Ticker
state bgp.FSMState
- incoming chan *bgp.BGPMessage
+ incoming chan *fsmMsg
outgoing chan *bgp.BGPMessage
passiveConn *net.TCPConn
passiveConnCh chan *net.TCPConn
- stateCh chan bgp.FSMState
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
@@ -78,7 +90,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
}
-func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *bgp.BGPMessage, outgoing chan *bgp.BGPMessage) *FSM {
+func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSM {
return &FSM{
globalConfig: gConfig,
peerConfig: pConfig,
@@ -86,14 +98,9 @@ func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh cha
outgoing: outgoing,
state: bgp.BGP_FSM_IDLE,
passiveConnCh: connCh,
- stateCh: make(chan bgp.FSMState),
}
}
-func (fsm *FSM) StateChanged() chan bgp.FSMState {
- return fsm.stateCh
-}
-
func (fsm *FSM) StateChange(nextState bgp.FSMState) {
log.Debugf("Peer (%v) state changed from %v to %v", fsm.peerConfig.NeighborAddress, fsm.state, nextState)
fsm.state = nextState
@@ -103,7 +110,7 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn *net.TCPConn
- msgCh chan *bgp.BGPMessage
+ msgCh chan *fsmMsg
errorCh chan bool
}
@@ -208,7 +215,11 @@ func (h *FSMHandler) recvMessageWithError() error {
h.errorCh <- true
return err
}
- h.msgCh <- m
+ e := &fsmMsg{
+ MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgData: m,
+ }
+ h.msgCh <- e
return nil
}
@@ -224,7 +235,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
fsm.passiveConn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.msgCh = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *fsmMsg)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -234,10 +245,15 @@ func (h *FSMHandler) opensent() bgp.FSMState {
case <-h.t.Dying():
fsm.passiveConn.Close()
return 0
- case m := <-h.msgCh:
+ case e := <-h.msgCh:
+ m := e.MsgData.(*bgp.BGPMessage)
fsm.bgpMessageStateUpdate(m.Header.Type, true)
if m.Header.Type == bgp.BGP_MSG_OPEN {
- fsm.incoming <- m
+ e := &fsmMsg{
+ MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgData: m,
+ }
+ fsm.incoming <- e
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
fsm.passiveConn.Write(b)
@@ -256,7 +272,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval)
fsm.keepaliveTicker = time.NewTicker(sec)
- h.msgCh = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *fsmMsg)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -271,7 +287,8 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
b, _ := m.Serialize()
// TODO: check error
fsm.passiveConn.Write(b)
- case m := <-h.msgCh:
+ case e := <-h.msgCh:
+ m := e.MsgData.(*bgp.BGPMessage)
nextState := bgp.BGP_FSM_IDLE
fsm.bgpMessageStateUpdate(m.Header.Type, true)
if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
@@ -366,7 +383,11 @@ func (h *FSMHandler) loop() error {
// zero means that tomb.Dying()
if nextState >= bgp.BGP_FSM_IDLE {
- fsm.stateCh <- nextState
+ e := &fsmMsg{
+ MsgType: FSM_MSG_STATE_CHANGE,
+ MsgData: nextState,
+ }
+ fsm.incoming <- e
}
return nil
}
diff --git a/server/peer.go b/server/peer.go
index 88a327e7..6f18f4ec 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -32,7 +32,7 @@ type Peer struct {
globalConfig config.GlobalType
peerConfig config.NeighborType
acceptedConnCh chan *net.TCPConn
- incoming chan *bgp.BGPMessage
+ incoming chan *fsmMsg
outgoing chan *bgp.BGPMessage
inEventCh chan *message
outEventCh chan *message
@@ -52,7 +52,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *mes
globalConfig: g,
peerConfig: peer,
acceptedConnCh: make(chan *net.TCPConn),
- incoming: make(chan *bgp.BGPMessage, 4096),
+ incoming: make(chan *fsmMsg, 4096),
outgoing: make(chan *bgp.BGPMessage, 4096),
inEventCh: make(chan *message, 4096),
outEventCh: outEventCh,
@@ -114,8 +114,6 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) {
func (peer *Peer) sendMessages(msgs []*bgp.BGPMessage) {
for _, m := range msgs {
- // FIXME: there is race where state change
- // (established) event arrived before open message
if peer.peerConfig.BgpNeighborCommonState.State != uint32(bgp.BGP_FSM_ESTABLISHED) {
continue
}
@@ -175,37 +173,38 @@ func (peer *Peer) loop() error {
sameState := true
for sameState {
select {
- case nextState := <-peer.fsm.StateChanged():
- // waits for all goroutines created for the current state
- h.Wait()
- oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State)
- peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState)
- peer.fsm.StateChange(nextState)
- sameState = false
- if nextState == bgp.BGP_FSM_ESTABLISHED {
- pathList := peer.adjRib.GetOutPathList(peer.rf)
- peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
- peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now()
- peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
- if oldState >= bgp.BGP_FSM_OPENSENT {
- peer.peerInfo.VersionNum++
- }
- }
- if oldState == bgp.BGP_FSM_ESTABLISHED {
- peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{}
- peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo)
- }
case <-peer.t.Dying():
close(peer.acceptedConnCh)
h.Stop()
close(peer.incoming)
close(peer.outgoing)
return nil
- case m := <-peer.incoming:
- if m == nil {
- continue
+ case e := <-peer.incoming:
+ switch e.MsgType {
+ case FSM_MSG_STATE_CHANGE:
+ nextState := e.MsgData.(bgp.FSMState)
+ // waits for all goroutines created for the current state
+ h.Wait()
+ oldState := bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State)
+ peer.peerConfig.BgpNeighborCommonState.State = uint32(nextState)
+ peer.fsm.StateChange(nextState)
+ sameState = false
+ if nextState == bgp.BGP_FSM_ESTABLISHED {
+ pathList := peer.adjRib.GetOutPathList(peer.rf)
+ peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
+ peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now()
+ peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
+ if oldState >= bgp.BGP_FSM_OPENSENT {
+ peer.peerInfo.VersionNum++
+ }
+ }
+ if oldState == bgp.BGP_FSM_ESTABLISHED {
+ peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{}
+ peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo)
+ }
+ case FSM_MSG_BGP_MESSAGE:
+ peer.handleBGPmessage(e.MsgData.(*bgp.BGPMessage))
}
- peer.handleBGPmessage(m)
case m := <-peer.inEventCh:
peer.handlePeermessage(m)
}