diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-06-09 21:20:07 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-06-09 23:20:15 +0900 |
commit | ca832f94bbb1c8e0cecfd7118366a48159512e23 (patch) | |
tree | 7197e7bfb41d612be3549ddfa8bd2f3f2c71f2db /server/fsm.go | |
parent | a97129f1400f2be76942124f535fb9831063aa5a (diff) |
server: kill peerMsg
Peers send and receive messages via channels, which could lead to a
deadlock. With this patch, multiple goroutines are used for network
I/Os per peer but one goroutine handle all ribs (including the global
rib). So there is no messages via channels between peers.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/fsm.go')
-rw-r--r-- | server/fsm.go | 106 |
1 files changed, 85 insertions, 21 deletions
diff --git a/server/fsm.go b/server/fsm.go index 0cbdd84e..2ebe8047 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -23,6 +23,7 @@ import ( "gopkg.in/tomb.v2" "io" "net" + "strconv" "time" ) @@ -36,6 +37,7 @@ const ( type fsmMsg struct { MsgType fsmMsgType + MsgSrc string MsgData interface{} } @@ -63,6 +65,7 @@ func (s AdminState) String() string { } type FSM struct { + t tomb.Tomb globalConfig *config.Global peerConfig *config.Neighbor keepaliveTicker *time.Ticker @@ -74,6 +77,8 @@ type FSM struct { negotiatedHoldTime float64 adminState AdminState adminStateCh chan AdminState + getActiveCh chan struct{} + h *FSMHandler } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -124,16 +129,19 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConfig *config.Global, pConfig *config.Neighbor, connCh chan net.Conn) *FSM { - return &FSM{ +func NewFSM(gConfig *config.Global, pConfig *config.Neighbor) *FSM { + fsm := &FSM{ globalConfig: gConfig, peerConfig: pConfig, state: bgp.BGP_FSM_IDLE, - connCh: connCh, + connCh: make(chan net.Conn), opensentHoldTime: float64(HOLDTIME_OPENSENT), adminState: ADMIN_STATE_UP, adminStateCh: make(chan AdminState, 1), + getActiveCh: make(chan struct{}), } + fsm.t.Go(fsm.connectLoop) + return fsm } func (fsm *FSM) StateChange(nextState bgp.FSMState) { @@ -144,6 +152,18 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) { "new": nextState.String(), }).Debug("state changed") fsm.state = nextState + switch nextState { + case bgp.BGP_FSM_ESTABLISHED: + fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix() + fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ + case bgp.BGP_FSM_ACTIVE: + if !fsm.peerConfig.TransportOptions.PassiveMode { + fsm.getActiveCh <- struct{}{} + } + fallthrough + default: + fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() + } } func (fsm *FSM) LocalAddr() net.IP { @@ -179,6 +199,55 @@ func (fsm *FSM) sendNotification(conn net.Conn, code, subType uint8, data []byte fsm.sendNotificatonFromErrorMsg(conn, e.(*bgp.MessageError)) } +func (fsm *FSM) connectLoop() error { + var tick int + if tick = int(fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY { + tick = MIN_CONNECT_RETRY + } + + ticker := time.NewTicker(time.Duration(tick) * time.Second) + ticker.Stop() + + connect := func() { + if bgp.FSMState(fsm.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE { + var host string + addr := fsm.peerConfig.NeighborAddress + + if addr.To4() != nil { + host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT) + } else { + host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT) + } + + conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second) + if err == nil { + fsm.connCh <- conn + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + }).Debugf("failed to connect: %s", err) + } + } + } + + for { + select { + case <-fsm.t.Dying(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + }).Debug("stop connect loop") + ticker.Stop() + return nil + case <-ticker.C: + connect() + case <-fsm.getActiveCh: + ticker = time.NewTicker(time.Duration(tick) * time.Second) + } + } +} + type FSMHandler struct { t tomb.Tomb fsm *FSM @@ -203,15 +272,6 @@ func NewFSMHandler(fsm *FSM, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessag return f } -func (h *FSMHandler) Wait() error { - return h.t.Wait() -} - -func (h *FSMHandler) Stop() error { - h.t.Kill(nil) - return h.t.Wait() -} - func (h *FSMHandler) idle() bgp.FSMState { fsm := h.fsm @@ -354,6 +414,7 @@ func (h *FSMHandler) recvMessageWithError() error { }).Warn("malformed BGP Header") h.msgCh <- &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: err, } return err @@ -381,11 +442,13 @@ func (h *FSMHandler) recvMessageWithError() error { }).Warn("malformed BGP message") fmsg = &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: err, } } else { fmsg = &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.peerConfig.NeighborAddress.String(), MsgData: m, } if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { @@ -456,6 +519,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { e := &fsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: fsm.peerConfig.NeighborAddress.String(), MsgData: m, } h.incoming <- e @@ -656,12 +720,12 @@ func (h *FSMHandler) sendMessageloop() error { // connection is closed and tried to kill us, // we need to die immediately. Otherwise fms // doesn't go to idle. - for len(h.outgoing) > 0 { - m := <-h.outgoing - err := send(m) - if err != nil { - return nil - } + // + // we always try to send. in case b), the + // connection was already closed so it + // correctly works in both cases. + if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + send(bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)) } return nil case m := <-h.outgoing: @@ -759,8 +823,8 @@ func (h *FSMHandler) loop() error { switch fsm.state { case bgp.BGP_FSM_IDLE: nextState = h.idle() - // case bgp.BGP_FSM_CONNECT: - // return h.connect() + // case bgp.BGP_FSM_CONNECT: + // nextState = h.connect() case bgp.BGP_FSM_ACTIVE: nextState = h.active() case bgp.BGP_FSM_OPENSENT: @@ -790,6 +854,7 @@ func (h *FSMHandler) loop() error { if nextState >= bgp.BGP_FSM_IDLE { e := &fsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, + MsgSrc: fsm.peerConfig.NeighborAddress.String(), MsgData: nextState, } h.incoming <- e @@ -800,7 +865,6 @@ func (h *FSMHandler) loop() error { func (h *FSMHandler) changeAdminState(s AdminState) error { fsm := h.fsm if fsm.adminState != s { - log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.peerConfig.NeighborAddress, |