summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/fsm.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-12-03 21:04:08 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-12-19 20:14:24 +0900
commit46e375476196b6e670f6a627448471636dac69e6 (patch)
tree94be8d59e6c4d7acff71c1485e53075a302a4cc9 /pkg/server/fsm.go
parent598bba9fb2b761d1ad32776b0a61357500227de1 (diff)
server: replace tomb with context
All what we need is cancelling. Let's use the standard way for it, context. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server/fsm.go')
-rw-r--r--pkg/server/fsm.go391
1 files changed, 208 insertions, 183 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go
index c9198af1..2440f65a 100644
--- a/pkg/server/fsm.go
+++ b/pkg/server/fsm.go
@@ -16,6 +16,7 @@
package server
import (
+ "context"
"fmt"
"io"
"math/rand"
@@ -32,7 +33,6 @@ import (
"github.com/osrg/gobgp/pkg/packet/bmp"
log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
)
type peerDownReason int
@@ -190,7 +190,6 @@ type adminStateOperation struct {
var fsmVersion uint
type fsm struct {
- t tomb.Tomb
gConf *config.Global
pConf *config.Neighbor
lock sync.RWMutex
@@ -202,7 +201,6 @@ type fsm struct {
opensentHoldTime float64
adminState adminState
adminStateCh chan adminStateOperation
- getActiveCh chan struct{}
h *fsmHandler
rfMap map[bgp.RouteFamily]bgp.BGPAddPathMode
capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface
@@ -296,7 +294,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
opensentHoldTime: float64(holdtimeOpensent),
adminState: adminState,
adminStateCh: make(chan adminStateOperation, 1),
- getActiveCh: make(chan struct{}),
rfMap: make(map[bgp.RouteFamily]bgp.BGPAddPathMode),
capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface),
peerInfo: table.NewPeerInfo(gConf, pConf),
@@ -305,7 +302,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
version: fsmVersion,
}
fsm.gracefulRestartTimer.Stop()
- fsm.t.Go(fsm.connectLoop)
return fsm
}
@@ -343,11 +339,6 @@ func (fsm *fsm) StateChange(nextState bgp.FSMState) {
if !y {
fsm.twoByteAsTrans = true
}
- case bgp.BGP_FSM_ACTIVE:
- if !fsm.pConf.Transport.Config.PassiveMode {
- fsm.getActiveCh <- struct{}{}
- }
- fallthrough
default:
fsm.pConf.Timers.State.Downtime = time.Now().Unix()
}
@@ -403,106 +394,7 @@ func (fsm *fsm) sendNotification(code, subType uint8, data []byte, msg string) (
return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError))
}
-func (fsm *fsm) connectLoop() error {
- fsm.lock.RLock()
- tick := int(fsm.pConf.Timers.Config.ConnectRetry)
- fsm.lock.RUnlock()
- if tick < minConnectRetry {
- tick = minConnectRetry
- }
-
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
-
- timer := time.NewTimer(time.Duration(tick) * time.Second)
- timer.Stop()
-
- connect := func() {
- fsm.lock.RLock()
- defer fsm.lock.RUnlock()
-
- addr := fsm.pConf.State.NeighborAddress
- port := int(bgp.BGP_PORT)
- if fsm.pConf.Transport.Config.RemotePort != 0 {
- port = int(fsm.pConf.Transport.Config.RemotePort)
- }
- laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(fsm.pConf.Transport.Config.LocalAddress, "0"))
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Warnf("failed to resolve local address: %s", err)
- return
- }
- password := fsm.pConf.Config.AuthPassword
- ttl := uint8(0)
- ttlMin := uint8(0)
-
- if fsm.pConf.TtlSecurity.Config.Enabled {
- ttl = 255
- ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin
- } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL {
- ttl = 1
- if fsm.pConf.EbgpMultihop.Config.Enabled {
- ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl
- }
- }
-
- d := net.Dialer{
- LocalAddr: laddr,
- Timeout: time.Duration(minConnectRetry-1) * time.Second,
- Control: func(network, address string, c syscall.RawConn) error {
- return dialerControl(network, address, c, ttl, ttlMin, password)
- },
- }
- conn, err := d.Dial("tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port)))
- if err == nil {
- select {
- case fsm.connCh <- conn:
- return
- default:
- conn.Close()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Warn("active conn is closed to avoid being blocked")
- }
- } else {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Debugf("failed to connect: %s", err)
- }
-
- if fsm.state == bgp.BGP_FSM_ACTIVE {
- timer.Reset(time.Duration(tick) * time.Second)
- }
- }
-
- for {
- select {
- case <-fsm.t.Dying():
- fsm.lock.RLock()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": fsm.pConf.State.NeighborAddress,
- }).Debug("stop connect loop")
- fsm.lock.RUnlock()
- return nil
- case <-timer.C:
- fsm.lock.RLock()
- ready := fsm.state == bgp.BGP_FSM_ACTIVE
- fsm.lock.RUnlock()
- if ready {
- go connect()
- }
- case <-fsm.getActiveCh:
- timer.Reset(time.Duration(r.Intn(minConnectRetry)+minConnectRetry) * time.Second)
- }
- }
-}
-
type fsmHandler struct {
- t tomb.Tomb
fsm *fsm
conn net.Conn
msgCh *channels.InfiniteChannel
@@ -512,9 +404,13 @@ type fsmHandler struct {
outgoing *channels.InfiniteChannel
holdTimerResetCh chan bool
sentNotification *bgp.BGPMessage
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ wg *sync.WaitGroup
}
func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler {
+ ctx, cancel := context.WithCancel(context.Background())
h := &fsmHandler{
fsm: fsm,
stateReasonCh: make(chan fsmStateReason, 2),
@@ -522,12 +418,16 @@ func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *f
stateCh: stateCh,
outgoing: outgoing,
holdTimerResetCh: make(chan bool, 2),
+ wg: &sync.WaitGroup{},
+ ctx: ctx,
+ ctxCancel: cancel,
}
- fsm.t.Go(h.loop)
+ h.wg.Add(1)
+ go h.loop(ctx, h.wg)
return h
}
-func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) idle(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
fsm.lock.RLock()
@@ -536,7 +436,7 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
return -1, newfsmStateReason(fsmDying, nil, nil)
case <-fsm.gracefulRestartTimer.C:
fsm.lock.RLock()
@@ -605,11 +505,123 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) active() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) connectLoop(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
fsm := h.fsm
+
+ tick, addr, port, password, ttl, ttlMin, localAddress := func() (int, string, int, string, uint8, uint8, string) {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
+ tick := int(fsm.pConf.Timers.Config.ConnectRetry)
+ if tick < minConnectRetry {
+ tick = minConnectRetry
+ }
+
+ addr := fsm.pConf.State.NeighborAddress
+ port := int(bgp.BGP_PORT)
+ if fsm.pConf.Transport.Config.RemotePort != 0 {
+ port = int(fsm.pConf.Transport.Config.RemotePort)
+ }
+ password := fsm.pConf.Config.AuthPassword
+ ttl := uint8(0)
+ ttlMin := uint8(0)
+
+ if fsm.pConf.TtlSecurity.Config.Enabled {
+ ttl = 255
+ ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin
+ } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL {
+ ttl = 1
+ if fsm.pConf.EbgpMultihop.Config.Enabled {
+ ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl
+ }
+ }
+ return tick, addr, port, password, ttl, ttlMin, fsm.pConf.Transport.Config.LocalAddress
+ }()
+
for {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ timer := time.NewTimer(time.Duration(r.Intn(tick)+tick) * time.Second)
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("stop connect loop")
+ timer.Stop()
+ return
+ case <-timer.C:
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("try to connect")
+ }
+
+ laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(localAddress, "0"))
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Warnf("failed to resolve local address: %s", err)
+ }
+
+ if err == nil {
+ d := net.Dialer{
+ LocalAddr: laddr,
+ Timeout: time.Duration(tick-1) * time.Second,
+ Control: func(network, address string, c syscall.RawConn) error {
+ return dialerControl(network, address, c, ttl, ttlMin, password)
+ },
+ }
+
+ conn, err := d.DialContext(ctx, "tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port)))
+ select {
+ case <-ctx.Done():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("stop connect loop")
+ return
+ default:
+ }
+
+ if err == nil {
+ select {
+ case fsm.connCh <- conn:
+ return
+ default:
+ conn.Close()
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Warn("active conn is closed to avoid being blocked")
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debugf("failed to connect: %s", err)
+ }
+ }
+ }
+}
+
+func (h *fsmHandler) active(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
+ c, cancel := context.WithCancel(ctx)
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go h.connectLoop(c, &wg)
+
+ defer func() {
+ cancel()
+ wg.Wait()
+ }()
+
+ fsm := h.fsm
+ for {
+ select {
+ case <-ctx.Done():
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
if !ok {
@@ -1118,8 +1130,11 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
return fmsg, nil
}
-func (h *fsmHandler) recvMessage() error {
- defer h.msgCh.Close()
+func (h *fsmHandler) recvMessage(ctx context.Context, wg *sync.WaitGroup) error {
+ defer func() {
+ h.msgCh.Close()
+ wg.Done()
+ }()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
h.msgCh.In() <- fmsg
@@ -1184,7 +1199,7 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode]
return capMap, negotiated
}
-func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
fsm.lock.RLock()
@@ -1201,7 +1216,10 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
h.conn = fsm.conn
fsm.lock.RUnlock()
- h.t.Go(h.recvMessage)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ defer wg.Wait()
+ go h.recvMessage(ctx, &wg)
// RFC 4271 P.60
// sets its HoldTimer to a large value
@@ -1213,7 +1231,7 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
@@ -1420,7 +1438,6 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired")
- h.t.Kill(nil)
return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil)
case stateOp := <-fsm.adminStateCh:
err := h.changeadminState(stateOp.State)
@@ -1457,14 +1474,17 @@ func keepaliveTicker(fsm *fsm) *time.Ticker {
return time.NewTicker(sec)
}
-func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
h.msgCh = channels.NewInfiniteChannel()
fsm.lock.RLock()
h.conn = fsm.conn
- h.t.Go(h.recvMessage)
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(1)
+ go h.recvMessage(ctx, &wg)
var holdTimer *time.Timer
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
@@ -1478,7 +1498,7 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
@@ -1544,7 +1564,6 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired")
- h.t.Kill(nil)
return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil)
case stateOp := <-fsm.adminStateCh:
err := h.changeadminState(stateOp.State)
@@ -1566,7 +1585,8 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) sendMessageloop() error {
+func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
conn := h.conn
fsm := h.fsm
ticker := keepaliveTicker(fsm)
@@ -1678,27 +1698,31 @@ func (h *fsmHandler) sendMessageloop() error {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
return nil
case o := <-h.outgoing.Out():
- m := o.(*fsmOutgoingMsg)
- h.fsm.lock.RLock()
- options := h.fsm.marshallingOptions
- h.fsm.lock.RUnlock()
- for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
- if err := send(msg); err != nil {
- return nil
- }
- }
- if m.Notification != nil {
- if m.StayIdle {
- // current user is only prefix-limit
- // fix me if this is not the case
- h.changeadminState(adminStatePfxCt)
+ switch m := o.(type) {
+ case *fsmOutgoingMsg:
+ h.fsm.lock.RLock()
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+ for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
+ if err := send(msg); err != nil {
+ return nil
+ }
}
- if err := send(m.Notification); err != nil {
- return nil
+ if m.Notification != nil {
+ if m.StayIdle {
+ // current user is only prefix-limit
+ // fix me if this is not the case
+ h.changeadminState(adminStatePfxCt)
+ }
+ if err := send(m.Notification); err != nil {
+ return nil
+ }
}
+ default:
+ return nil
}
case <-ticker.C:
if err := send(bgp.NewBGPKeepAliveMessage()); err != nil {
@@ -1708,7 +1732,8 @@ func (h *fsmHandler) sendMessageloop() error {
}
}
-func (h *fsmHandler) recvMessageloop() error {
+func (h *fsmHandler) recvMessageloop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
@@ -1720,14 +1745,19 @@ func (h *fsmHandler) recvMessageloop() error {
}
}
-func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
+ var wg sync.WaitGroup
fsm := h.fsm
fsm.lock.Lock()
h.conn = fsm.conn
fsm.lock.Unlock()
- h.t.Go(h.sendMessageloop)
+
+ defer wg.Wait()
+ wg.Add(2)
+
+ go h.sendMessageloop(ctx, &wg)
h.msgCh = h.incoming
- h.t.Go(h.recvMessageloop)
+ go h.recvMessageloop(ctx, &wg)
var holdTimer *time.Timer
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
@@ -1742,7 +1772,8 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
+ h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
if !ok {
@@ -1758,7 +1789,12 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
fsm.lock.RUnlock()
case err := <-h.stateReasonCh:
h.conn.Close()
- h.t.Kill(nil)
+ // if recv goroutine hit an error and sent to
+ // stateReasonCh, then tx goroutine might take
+ // long until it exits because it waits for
+ // ctx.Done() or keepalive timer. So let kill
+ // it now.
+ h.outgoing.In() <- err
fsm.lock.RLock()
if s := fsm.pConf.GracefulRestart.State; s.Enabled &&
(s.NotificationEnabled && err.Type == fsmNotificationRecv ||
@@ -1804,43 +1840,38 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) loop() error {
+func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
+
fsm := h.fsm
- ch := make(chan bgp.FSMState)
fsm.lock.RLock()
oldState := fsm.state
fsm.lock.RUnlock()
var reason *fsmStateReason
- f := func() error {
- nextState := bgp.FSMState(-1)
- fsm.lock.RLock()
- fsmState := fsm.state
- fsm.lock.RUnlock()
- switch fsmState {
- case bgp.BGP_FSM_IDLE:
- nextState, reason = h.idle()
- // case bgp.BGP_FSM_CONNECT:
- // nextState = h.connect()
- case bgp.BGP_FSM_ACTIVE:
- nextState, reason = h.active()
- case bgp.BGP_FSM_OPENSENT:
- nextState, reason = h.opensent()
- case bgp.BGP_FSM_OPENCONFIRM:
- nextState, reason = h.openconfirm()
- case bgp.BGP_FSM_ESTABLISHED:
- nextState, reason = h.established()
- }
- fsm.reason = reason
- ch <- nextState
- return nil
- }
-
- h.t.Go(f)
+ nextState := bgp.FSMState(-1)
+ fsm.lock.RLock()
+ fsmState := fsm.state
+ fsm.lock.RUnlock()
- nextState := <-ch
+ switch fsmState {
+ case bgp.BGP_FSM_IDLE:
+ nextState, reason = h.idle(ctx)
+ // case bgp.BGP_FSM_CONNECT:
+ // nextState = h.connect()
+ case bgp.BGP_FSM_ACTIVE:
+ nextState, reason = h.active(ctx)
+ case bgp.BGP_FSM_OPENSENT:
+ nextState, reason = h.opensent(ctx)
+ case bgp.BGP_FSM_OPENCONFIRM:
+ nextState, reason = h.openconfirm(ctx)
+ case bgp.BGP_FSM_ESTABLISHED:
+ nextState, reason = h.established(ctx)
+ }
fsm.lock.RLock()
+ fsm.reason = reason
+
if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1867,13 +1898,7 @@ func (h *fsmHandler) loop() error {
}
fsm.lock.RUnlock()
- e := time.AfterFunc(time.Second*120, func() {
- log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState)
- })
- h.t.Wait()
- e.Stop()
-
- // under zero means that tomb.Dying()
+ // under zero means that the context was canceled.
if nextState >= bgp.BGP_FSM_IDLE {
fsm.lock.RLock()
h.stateCh <- &fsmMsg{