summaryrefslogtreecommitdiffhomepage
path: root/server/fsm.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-06-09 21:20:07 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-06-09 23:20:15 +0900
commitca832f94bbb1c8e0cecfd7118366a48159512e23 (patch)
tree7197e7bfb41d612be3549ddfa8bd2f3f2c71f2db /server/fsm.go
parenta97129f1400f2be76942124f535fb9831063aa5a (diff)
server: kill peerMsg
Peers send and receive messages via channels, which could lead to a deadlock. With this patch, multiple goroutines are used for network I/Os per peer but one goroutine handle all ribs (including the global rib). So there is no messages via channels between peers. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/fsm.go')
-rw-r--r--server/fsm.go106
1 files changed, 85 insertions, 21 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 0cbdd84e..2ebe8047 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -23,6 +23,7 @@ import (
"gopkg.in/tomb.v2"
"io"
"net"
+ "strconv"
"time"
)
@@ -36,6 +37,7 @@ const (
type fsmMsg struct {
MsgType fsmMsgType
+ MsgSrc string
MsgData interface{}
}
@@ -63,6 +65,7 @@ func (s AdminState) String() string {
}
type FSM struct {
+ t tomb.Tomb
globalConfig *config.Global
peerConfig *config.Neighbor
keepaliveTicker *time.Ticker
@@ -74,6 +77,8 @@ type FSM struct {
negotiatedHoldTime float64
adminState AdminState
adminStateCh chan AdminState
+ getActiveCh chan struct{}
+ h *FSMHandler
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
@@ -124,16 +129,19 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
}
-func NewFSM(gConfig *config.Global, pConfig *config.Neighbor, connCh chan net.Conn) *FSM {
- return &FSM{
+func NewFSM(gConfig *config.Global, pConfig *config.Neighbor) *FSM {
+ fsm := &FSM{
globalConfig: gConfig,
peerConfig: pConfig,
state: bgp.BGP_FSM_IDLE,
- connCh: connCh,
+ connCh: make(chan net.Conn),
opensentHoldTime: float64(HOLDTIME_OPENSENT),
adminState: ADMIN_STATE_UP,
adminStateCh: make(chan AdminState, 1),
+ getActiveCh: make(chan struct{}),
}
+ fsm.t.Go(fsm.connectLoop)
+ return fsm
}
func (fsm *FSM) StateChange(nextState bgp.FSMState) {
@@ -144,6 +152,18 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) {
"new": nextState.String(),
}).Debug("state changed")
fsm.state = nextState
+ switch nextState {
+ case bgp.BGP_FSM_ESTABLISHED:
+ fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix()
+ fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
+ case bgp.BGP_FSM_ACTIVE:
+ if !fsm.peerConfig.TransportOptions.PassiveMode {
+ fsm.getActiveCh <- struct{}{}
+ }
+ fallthrough
+ default:
+ fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix()
+ }
}
func (fsm *FSM) LocalAddr() net.IP {
@@ -179,6 +199,55 @@ func (fsm *FSM) sendNotification(conn net.Conn, code, subType uint8, data []byte
fsm.sendNotificatonFromErrorMsg(conn, e.(*bgp.MessageError))
}
+func (fsm *FSM) connectLoop() error {
+ var tick int
+ if tick = int(fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY {
+ tick = MIN_CONNECT_RETRY
+ }
+
+ ticker := time.NewTicker(time.Duration(tick) * time.Second)
+ ticker.Stop()
+
+ connect := func() {
+ if bgp.FSMState(fsm.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE {
+ var host string
+ addr := fsm.peerConfig.NeighborAddress
+
+ if addr.To4() != nil {
+ host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT)
+ } else {
+ host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT)
+ }
+
+ conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second)
+ if err == nil {
+ fsm.connCh <- conn
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": fsm.peerConfig.NeighborAddress,
+ }).Debugf("failed to connect: %s", err)
+ }
+ }
+ }
+
+ for {
+ select {
+ case <-fsm.t.Dying():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": fsm.peerConfig.NeighborAddress,
+ }).Debug("stop connect loop")
+ ticker.Stop()
+ return nil
+ case <-ticker.C:
+ connect()
+ case <-fsm.getActiveCh:
+ ticker = time.NewTicker(time.Duration(tick) * time.Second)
+ }
+ }
+}
+
type FSMHandler struct {
t tomb.Tomb
fsm *FSM
@@ -203,15 +272,6 @@ func NewFSMHandler(fsm *FSM, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessag
return f
}
-func (h *FSMHandler) Wait() error {
- return h.t.Wait()
-}
-
-func (h *FSMHandler) Stop() error {
- h.t.Kill(nil)
- return h.t.Wait()
-}
-
func (h *FSMHandler) idle() bgp.FSMState {
fsm := h.fsm
@@ -354,6 +414,7 @@ func (h *FSMHandler) recvMessageWithError() error {
}).Warn("malformed BGP Header")
h.msgCh <- &fsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgSrc: h.fsm.peerConfig.NeighborAddress.String(),
MsgData: err,
}
return err
@@ -381,11 +442,13 @@ func (h *FSMHandler) recvMessageWithError() error {
}).Warn("malformed BGP message")
fmsg = &fsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgSrc: h.fsm.peerConfig.NeighborAddress.String(),
MsgData: err,
}
} else {
fmsg = &fsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgSrc: h.fsm.peerConfig.NeighborAddress.String(),
MsgData: m,
}
if h.fsm.state == bgp.BGP_FSM_ESTABLISHED {
@@ -456,6 +519,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
e := &fsmMsg{
MsgType: FSM_MSG_BGP_MESSAGE,
+ MsgSrc: fsm.peerConfig.NeighborAddress.String(),
MsgData: m,
}
h.incoming <- e
@@ -656,12 +720,12 @@ func (h *FSMHandler) sendMessageloop() error {
// connection is closed and tried to kill us,
// we need to die immediately. Otherwise fms
// doesn't go to idle.
- for len(h.outgoing) > 0 {
- m := <-h.outgoing
- err := send(m)
- if err != nil {
- return nil
- }
+ //
+ // we always try to send. in case b), the
+ // connection was already closed so it
+ // correctly works in both cases.
+ if h.fsm.state == bgp.BGP_FSM_ESTABLISHED {
+ send(bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil))
}
return nil
case m := <-h.outgoing:
@@ -759,8 +823,8 @@ func (h *FSMHandler) loop() error {
switch fsm.state {
case bgp.BGP_FSM_IDLE:
nextState = h.idle()
- // case bgp.BGP_FSM_CONNECT:
- // return h.connect()
+ // case bgp.BGP_FSM_CONNECT:
+ // nextState = h.connect()
case bgp.BGP_FSM_ACTIVE:
nextState = h.active()
case bgp.BGP_FSM_OPENSENT:
@@ -790,6 +854,7 @@ func (h *FSMHandler) loop() error {
if nextState >= bgp.BGP_FSM_IDLE {
e := &fsmMsg{
MsgType: FSM_MSG_STATE_CHANGE,
+ MsgSrc: fsm.peerConfig.NeighborAddress.String(),
MsgData: nextState,
}
h.incoming <- e
@@ -800,7 +865,6 @@ func (h *FSMHandler) loop() error {
func (h *FSMHandler) changeAdminState(s AdminState) error {
fsm := h.fsm
if fsm.adminState != s {
-
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": fsm.peerConfig.NeighborAddress,