summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-12-03 21:04:08 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2018-12-19 20:14:24 +0900
commit46e375476196b6e670f6a627448471636dac69e6 (patch)
tree94be8d59e6c4d7acff71c1485e53075a302a4cc9
parent598bba9fb2b761d1ad32776b0a61357500227de1 (diff)
server: replace tomb with context
All what we need is cancelling. Let's use the standard way for it, context. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--Gopkg.lock9
-rw-r--r--Gopkg.toml4
-rw-r--r--pkg/server/fsm.go391
-rw-r--r--pkg/server/fsm_test.go11
-rw-r--r--pkg/server/peer.go29
-rw-r--r--pkg/server/server_test.go101
6 files changed, 276 insertions, 269 deletions
diff --git a/Gopkg.lock b/Gopkg.lock
index 47d291bf..5dcff56d 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -337,14 +337,6 @@
[[projects]]
branch = "v2"
- digest = "1:61a650a53e5e865a91ae9581f02990a4b6e3afcb8d280f19b1e67a3c284944e6"
- name = "gopkg.in/tomb.v2"
- packages = ["."]
- pruneopts = ""
- revision = "d5d1b5820637886def9eef33e03a27a9f166942c"
-
-[[projects]]
- branch = "v2"
digest = "1:f776026dedad7a9fef3c65180084620ca3684a87a177f5da8837755a1f8fa4fd"
name = "gopkg.in/yaml.v2"
packages = ["."]
@@ -377,7 +369,6 @@
"golang.org/x/net/context",
"google.golang.org/grpc",
"google.golang.org/grpc/credentials",
- "gopkg.in/tomb.v2",
]
solver-name = "gps-cdcl"
solver-version = 1
diff --git a/Gopkg.toml b/Gopkg.toml
index 72bab91b..442e34cf 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -76,7 +76,3 @@
[[constraint]]
name = "google.golang.org/grpc"
version = "1.5.1"
-
-[[constraint]]
- branch = "v2"
- name = "gopkg.in/tomb.v2"
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go
index c9198af1..2440f65a 100644
--- a/pkg/server/fsm.go
+++ b/pkg/server/fsm.go
@@ -16,6 +16,7 @@
package server
import (
+ "context"
"fmt"
"io"
"math/rand"
@@ -32,7 +33,6 @@ import (
"github.com/osrg/gobgp/pkg/packet/bmp"
log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
)
type peerDownReason int
@@ -190,7 +190,6 @@ type adminStateOperation struct {
var fsmVersion uint
type fsm struct {
- t tomb.Tomb
gConf *config.Global
pConf *config.Neighbor
lock sync.RWMutex
@@ -202,7 +201,6 @@ type fsm struct {
opensentHoldTime float64
adminState adminState
adminStateCh chan adminStateOperation
- getActiveCh chan struct{}
h *fsmHandler
rfMap map[bgp.RouteFamily]bgp.BGPAddPathMode
capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface
@@ -296,7 +294,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
opensentHoldTime: float64(holdtimeOpensent),
adminState: adminState,
adminStateCh: make(chan adminStateOperation, 1),
- getActiveCh: make(chan struct{}),
rfMap: make(map[bgp.RouteFamily]bgp.BGPAddPathMode),
capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface),
peerInfo: table.NewPeerInfo(gConf, pConf),
@@ -305,7 +302,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
version: fsmVersion,
}
fsm.gracefulRestartTimer.Stop()
- fsm.t.Go(fsm.connectLoop)
return fsm
}
@@ -343,11 +339,6 @@ func (fsm *fsm) StateChange(nextState bgp.FSMState) {
if !y {
fsm.twoByteAsTrans = true
}
- case bgp.BGP_FSM_ACTIVE:
- if !fsm.pConf.Transport.Config.PassiveMode {
- fsm.getActiveCh <- struct{}{}
- }
- fallthrough
default:
fsm.pConf.Timers.State.Downtime = time.Now().Unix()
}
@@ -403,106 +394,7 @@ func (fsm *fsm) sendNotification(code, subType uint8, data []byte, msg string) (
return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError))
}
-func (fsm *fsm) connectLoop() error {
- fsm.lock.RLock()
- tick := int(fsm.pConf.Timers.Config.ConnectRetry)
- fsm.lock.RUnlock()
- if tick < minConnectRetry {
- tick = minConnectRetry
- }
-
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
-
- timer := time.NewTimer(time.Duration(tick) * time.Second)
- timer.Stop()
-
- connect := func() {
- fsm.lock.RLock()
- defer fsm.lock.RUnlock()
-
- addr := fsm.pConf.State.NeighborAddress
- port := int(bgp.BGP_PORT)
- if fsm.pConf.Transport.Config.RemotePort != 0 {
- port = int(fsm.pConf.Transport.Config.RemotePort)
- }
- laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(fsm.pConf.Transport.Config.LocalAddress, "0"))
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Warnf("failed to resolve local address: %s", err)
- return
- }
- password := fsm.pConf.Config.AuthPassword
- ttl := uint8(0)
- ttlMin := uint8(0)
-
- if fsm.pConf.TtlSecurity.Config.Enabled {
- ttl = 255
- ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin
- } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL {
- ttl = 1
- if fsm.pConf.EbgpMultihop.Config.Enabled {
- ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl
- }
- }
-
- d := net.Dialer{
- LocalAddr: laddr,
- Timeout: time.Duration(minConnectRetry-1) * time.Second,
- Control: func(network, address string, c syscall.RawConn) error {
- return dialerControl(network, address, c, ttl, ttlMin, password)
- },
- }
- conn, err := d.Dial("tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port)))
- if err == nil {
- select {
- case fsm.connCh <- conn:
- return
- default:
- conn.Close()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Warn("active conn is closed to avoid being blocked")
- }
- } else {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Debugf("failed to connect: %s", err)
- }
-
- if fsm.state == bgp.BGP_FSM_ACTIVE {
- timer.Reset(time.Duration(tick) * time.Second)
- }
- }
-
- for {
- select {
- case <-fsm.t.Dying():
- fsm.lock.RLock()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": fsm.pConf.State.NeighborAddress,
- }).Debug("stop connect loop")
- fsm.lock.RUnlock()
- return nil
- case <-timer.C:
- fsm.lock.RLock()
- ready := fsm.state == bgp.BGP_FSM_ACTIVE
- fsm.lock.RUnlock()
- if ready {
- go connect()
- }
- case <-fsm.getActiveCh:
- timer.Reset(time.Duration(r.Intn(minConnectRetry)+minConnectRetry) * time.Second)
- }
- }
-}
-
type fsmHandler struct {
- t tomb.Tomb
fsm *fsm
conn net.Conn
msgCh *channels.InfiniteChannel
@@ -512,9 +404,13 @@ type fsmHandler struct {
outgoing *channels.InfiniteChannel
holdTimerResetCh chan bool
sentNotification *bgp.BGPMessage
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ wg *sync.WaitGroup
}
func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler {
+ ctx, cancel := context.WithCancel(context.Background())
h := &fsmHandler{
fsm: fsm,
stateReasonCh: make(chan fsmStateReason, 2),
@@ -522,12 +418,16 @@ func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *f
stateCh: stateCh,
outgoing: outgoing,
holdTimerResetCh: make(chan bool, 2),
+ wg: &sync.WaitGroup{},
+ ctx: ctx,
+ ctxCancel: cancel,
}
- fsm.t.Go(h.loop)
+ h.wg.Add(1)
+ go h.loop(ctx, h.wg)
return h
}
-func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) idle(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
fsm.lock.RLock()
@@ -536,7 +436,7 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
return -1, newfsmStateReason(fsmDying, nil, nil)
case <-fsm.gracefulRestartTimer.C:
fsm.lock.RLock()
@@ -605,11 +505,123 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) active() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) connectLoop(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
fsm := h.fsm
+
+ tick, addr, port, password, ttl, ttlMin, localAddress := func() (int, string, int, string, uint8, uint8, string) {
+ fsm.lock.RLock()
+ defer fsm.lock.RUnlock()
+
+ tick := int(fsm.pConf.Timers.Config.ConnectRetry)
+ if tick < minConnectRetry {
+ tick = minConnectRetry
+ }
+
+ addr := fsm.pConf.State.NeighborAddress
+ port := int(bgp.BGP_PORT)
+ if fsm.pConf.Transport.Config.RemotePort != 0 {
+ port = int(fsm.pConf.Transport.Config.RemotePort)
+ }
+ password := fsm.pConf.Config.AuthPassword
+ ttl := uint8(0)
+ ttlMin := uint8(0)
+
+ if fsm.pConf.TtlSecurity.Config.Enabled {
+ ttl = 255
+ ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin
+ } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL {
+ ttl = 1
+ if fsm.pConf.EbgpMultihop.Config.Enabled {
+ ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl
+ }
+ }
+ return tick, addr, port, password, ttl, ttlMin, fsm.pConf.Transport.Config.LocalAddress
+ }()
+
for {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ timer := time.NewTimer(time.Duration(r.Intn(tick)+tick) * time.Second)
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("stop connect loop")
+ timer.Stop()
+ return
+ case <-timer.C:
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("try to connect")
+ }
+
+ laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(localAddress, "0"))
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Warnf("failed to resolve local address: %s", err)
+ }
+
+ if err == nil {
+ d := net.Dialer{
+ LocalAddr: laddr,
+ Timeout: time.Duration(tick-1) * time.Second,
+ Control: func(network, address string, c syscall.RawConn) error {
+ return dialerControl(network, address, c, ttl, ttlMin, password)
+ },
+ }
+
+ conn, err := d.DialContext(ctx, "tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port)))
+ select {
+ case <-ctx.Done():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debug("stop connect loop")
+ return
+ default:
+ }
+
+ if err == nil {
+ select {
+ case fsm.connCh <- conn:
+ return
+ default:
+ conn.Close()
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Warn("active conn is closed to avoid being blocked")
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ }).Debugf("failed to connect: %s", err)
+ }
+ }
+ }
+}
+
+func (h *fsmHandler) active(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
+ c, cancel := context.WithCancel(ctx)
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go h.connectLoop(c, &wg)
+
+ defer func() {
+ cancel()
+ wg.Wait()
+ }()
+
+ fsm := h.fsm
+ for {
+ select {
+ case <-ctx.Done():
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
if !ok {
@@ -1118,8 +1130,11 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
return fmsg, nil
}
-func (h *fsmHandler) recvMessage() error {
- defer h.msgCh.Close()
+func (h *fsmHandler) recvMessage(ctx context.Context, wg *sync.WaitGroup) error {
+ defer func() {
+ h.msgCh.Close()
+ wg.Done()
+ }()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
h.msgCh.In() <- fmsg
@@ -1184,7 +1199,7 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode]
return capMap, negotiated
}
-func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
fsm.lock.RLock()
@@ -1201,7 +1216,10 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
h.conn = fsm.conn
fsm.lock.RUnlock()
- h.t.Go(h.recvMessage)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ defer wg.Wait()
+ go h.recvMessage(ctx, &wg)
// RFC 4271 P.60
// sets its HoldTimer to a large value
@@ -1213,7 +1231,7 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
@@ -1420,7 +1438,6 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) {
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired")
- h.t.Kill(nil)
return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil)
case stateOp := <-fsm.adminStateCh:
err := h.changeadminState(stateOp.State)
@@ -1457,14 +1474,17 @@ func keepaliveTicker(fsm *fsm) *time.Ticker {
return time.NewTicker(sec)
}
-func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
h.msgCh = channels.NewInfiniteChannel()
fsm.lock.RLock()
h.conn = fsm.conn
- h.t.Go(h.recvMessage)
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(1)
+ go h.recvMessage(ctx, &wg)
var holdTimer *time.Timer
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
@@ -1478,7 +1498,7 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
@@ -1544,7 +1564,6 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
return bgp.BGP_FSM_IDLE, &err
case <-holdTimer.C:
m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired")
- h.t.Kill(nil)
return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil)
case stateOp := <-fsm.adminStateCh:
err := h.changeadminState(stateOp.State)
@@ -1566,7 +1585,8 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) sendMessageloop() error {
+func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
conn := h.conn
fsm := h.fsm
ticker := keepaliveTicker(fsm)
@@ -1678,27 +1698,31 @@ func (h *fsmHandler) sendMessageloop() error {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
return nil
case o := <-h.outgoing.Out():
- m := o.(*fsmOutgoingMsg)
- h.fsm.lock.RLock()
- options := h.fsm.marshallingOptions
- h.fsm.lock.RUnlock()
- for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
- if err := send(msg); err != nil {
- return nil
- }
- }
- if m.Notification != nil {
- if m.StayIdle {
- // current user is only prefix-limit
- // fix me if this is not the case
- h.changeadminState(adminStatePfxCt)
+ switch m := o.(type) {
+ case *fsmOutgoingMsg:
+ h.fsm.lock.RLock()
+ options := h.fsm.marshallingOptions
+ h.fsm.lock.RUnlock()
+ for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
+ if err := send(msg); err != nil {
+ return nil
+ }
}
- if err := send(m.Notification); err != nil {
- return nil
+ if m.Notification != nil {
+ if m.StayIdle {
+ // current user is only prefix-limit
+ // fix me if this is not the case
+ h.changeadminState(adminStatePfxCt)
+ }
+ if err := send(m.Notification); err != nil {
+ return nil
+ }
}
+ default:
+ return nil
}
case <-ticker.C:
if err := send(bgp.NewBGPKeepAliveMessage()); err != nil {
@@ -1708,7 +1732,8 @@ func (h *fsmHandler) sendMessageloop() error {
}
}
-func (h *fsmHandler) recvMessageloop() error {
+func (h *fsmHandler) recvMessageloop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
@@ -1720,14 +1745,19 @@ func (h *fsmHandler) recvMessageloop() error {
}
}
-func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
+func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
+ var wg sync.WaitGroup
fsm := h.fsm
fsm.lock.Lock()
h.conn = fsm.conn
fsm.lock.Unlock()
- h.t.Go(h.sendMessageloop)
+
+ defer wg.Wait()
+ wg.Add(2)
+
+ go h.sendMessageloop(ctx, &wg)
h.msgCh = h.incoming
- h.t.Go(h.recvMessageloop)
+ go h.recvMessageloop(ctx, &wg)
var holdTimer *time.Timer
if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 {
@@ -1742,7 +1772,8 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
for {
select {
- case <-h.t.Dying():
+ case <-ctx.Done():
+ h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
if !ok {
@@ -1758,7 +1789,12 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
fsm.lock.RUnlock()
case err := <-h.stateReasonCh:
h.conn.Close()
- h.t.Kill(nil)
+ // if recv goroutine hit an error and sent to
+ // stateReasonCh, then tx goroutine might take
+ // long until it exits because it waits for
+ // ctx.Done() or keepalive timer. So let kill
+ // it now.
+ h.outgoing.In() <- err
fsm.lock.RLock()
if s := fsm.pConf.GracefulRestart.State; s.Enabled &&
(s.NotificationEnabled && err.Type == fsmNotificationRecv ||
@@ -1804,43 +1840,38 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) {
}
}
-func (h *fsmHandler) loop() error {
+func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error {
+ defer wg.Done()
+
fsm := h.fsm
- ch := make(chan bgp.FSMState)
fsm.lock.RLock()
oldState := fsm.state
fsm.lock.RUnlock()
var reason *fsmStateReason
- f := func() error {
- nextState := bgp.FSMState(-1)
- fsm.lock.RLock()
- fsmState := fsm.state
- fsm.lock.RUnlock()
- switch fsmState {
- case bgp.BGP_FSM_IDLE:
- nextState, reason = h.idle()
- // case bgp.BGP_FSM_CONNECT:
- // nextState = h.connect()
- case bgp.BGP_FSM_ACTIVE:
- nextState, reason = h.active()
- case bgp.BGP_FSM_OPENSENT:
- nextState, reason = h.opensent()
- case bgp.BGP_FSM_OPENCONFIRM:
- nextState, reason = h.openconfirm()
- case bgp.BGP_FSM_ESTABLISHED:
- nextState, reason = h.established()
- }
- fsm.reason = reason
- ch <- nextState
- return nil
- }
-
- h.t.Go(f)
+ nextState := bgp.FSMState(-1)
+ fsm.lock.RLock()
+ fsmState := fsm.state
+ fsm.lock.RUnlock()
- nextState := <-ch
+ switch fsmState {
+ case bgp.BGP_FSM_IDLE:
+ nextState, reason = h.idle(ctx)
+ // case bgp.BGP_FSM_CONNECT:
+ // nextState = h.connect()
+ case bgp.BGP_FSM_ACTIVE:
+ nextState, reason = h.active(ctx)
+ case bgp.BGP_FSM_OPENSENT:
+ nextState, reason = h.opensent(ctx)
+ case bgp.BGP_FSM_OPENCONFIRM:
+ nextState, reason = h.openconfirm(ctx)
+ case bgp.BGP_FSM_ESTABLISHED:
+ nextState, reason = h.established(ctx)
+ }
fsm.lock.RLock()
+ fsm.reason = reason
+
if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -1867,13 +1898,7 @@ func (h *fsmHandler) loop() error {
}
fsm.lock.RUnlock()
- e := time.AfterFunc(time.Second*120, func() {
- log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState)
- })
- h.t.Wait()
- e.Stop()
-
- // under zero means that tomb.Dying()
+ // under zero means that the context was canceled.
if nextState >= bgp.BGP_FSM_IDLE {
fsm.lock.RLock()
h.stateCh <- &fsmMsg{
diff --git a/pkg/server/fsm_test.go b/pkg/server/fsm_test.go
index 1e245a7e..cd3eb7e2 100644
--- a/pkg/server/fsm_test.go
+++ b/pkg/server/fsm_test.go
@@ -16,6 +16,7 @@
package server
import (
+ "context"
"errors"
"net"
"strconv"
@@ -181,7 +182,7 @@ func TestFSMHandlerOpensent_HoldTimerExpired(t *testing.T) {
// set holdtime
p.fsm.opensentHoldTime = 2
- state, _ := h.opensent()
+ state, _ := h.opensent(context.Background())
assert.Equal(bgp.BGP_FSM_IDLE, state)
lastMsg := m.sendBuf[len(m.sendBuf)-1]
@@ -206,7 +207,7 @@ func TestFSMHandlerOpenconfirm_HoldTimerExpired(t *testing.T) {
// set holdtime
p.fsm.pConf.Timers.State.NegotiatedHoldTime = 2
- state, _ := h.openconfirm()
+ state, _ := h.openconfirm(context.Background())
assert.Equal(bgp.BGP_FSM_IDLE, state)
lastMsg := m.sendBuf[len(m.sendBuf)-1]
@@ -244,7 +245,7 @@ func TestFSMHandlerEstablish_HoldTimerExpired(t *testing.T) {
p.fsm.pConf.Timers.State.NegotiatedHoldTime = 2
go pushPackets()
- state, _ := h.established()
+ state, _ := h.established(context.Background())
time.Sleep(time.Second * 1)
assert.Equal(bgp.BGP_FSM_IDLE, state)
m.mtx.Lock()
@@ -270,7 +271,7 @@ func TestFSMHandlerOpenconfirm_HoldtimeZero(t *testing.T) {
p.fsm.pConf.Timers.Config.KeepaliveInterval = 1
// set holdtime
p.fsm.pConf.Timers.State.NegotiatedHoldTime = 0
- go h.openconfirm()
+ go h.openconfirm(context.Background())
time.Sleep(100 * time.Millisecond)
@@ -292,7 +293,7 @@ func TestFSMHandlerEstablished_HoldtimeZero(t *testing.T) {
// set holdtime
p.fsm.pConf.Timers.State.NegotiatedHoldTime = 0
- go h.established()
+ go h.established(context.Background())
time.Sleep(100 * time.Millisecond)
diff --git a/pkg/server/peer.go b/pkg/server/peer.go
index 83a5d12f..4e51137a 100644
--- a/pkg/server/peer.go
+++ b/pkg/server/peer.go
@@ -29,10 +29,11 @@ import (
)
const (
- flopThreshold = time.Second * 30
- minConnectRetry = 10
+ flopThreshold = time.Second * 30
)
+var minConnectRetry = 10
+
type peerGroup struct {
Conf *config.PeerGroup
members map[string]config.Neighbor
@@ -560,6 +561,7 @@ func (peer *peer) stopFSM() error {
failed := false
peer.fsm.lock.RLock()
addr := peer.fsm.pConf.State.NeighborAddress
+ state := peer.fsm.state
peer.fsm.lock.RUnlock()
t1 := time.AfterFunc(time.Minute*5, func() {
log.WithFields(log.Fields{
@@ -567,33 +569,16 @@ func (peer *peer) stopFSM() error {
}).Warnf("Failed to free the fsm.h.t for %s", addr)
failed = true
})
-
- peer.fsm.h.t.Kill(nil)
- peer.fsm.h.t.Wait()
+ peer.fsm.h.ctxCancel()
+ peer.fsm.h.wg.Wait()
t1.Stop()
if !failed {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": addr,
+ "State": state,
}).Debug("freed fsm.h.t")
cleanInfiniteChannel(peer.outgoing)
}
- failed = false
- t2 := time.AfterFunc(time.Minute*5, func() {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Warnf("Failed to free the fsm.t for %s", addr)
- failed = true
- })
- peer.fsm.t.Kill(nil)
- peer.fsm.t.Wait()
- t2.Stop()
- if !failed {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- }).Debug("freed fsm.t")
- return nil
- }
return fmt.Errorf("Failed to free FSM for %s", addr)
}
diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go
index c8975e41..3bd47304 100644
--- a/pkg/server/server_test.go
+++ b/pkg/server/server_test.go
@@ -169,6 +169,8 @@ func TestListPolicyAssignment(t *testing.T) {
}
func TestMonitor(test *testing.T) {
+ minConnectRetry = 5
+
assert := assert.New(test)
s := NewBgpServer()
go s.Serve()
@@ -220,7 +222,7 @@ func TestMonitor(test *testing.T) {
},
Timers: config.Timers{
Config: config.TimersConfig{
- ConnectRetry: 10,
+ ConnectRetry: 5,
},
},
}
@@ -526,6 +528,8 @@ func TestFilterpathWithRejectPolicy(t *testing.T) {
}
func TestPeerGroup(test *testing.T) {
+ minConnectRetry = 5
+
assert := assert.New(test)
log.SetLevel(log.DebugLevel)
s := NewBgpServer()
@@ -545,6 +549,11 @@ func TestPeerGroup(test *testing.T) {
PeerAs: 2,
PeerGroupName: "g",
},
+ Timers: config.Timers{
+ Config: config.TimersConfig{
+ ConnectRetry: 5,
+ },
+ },
}
err = s.addPeerGroup(g)
assert.Nil(err)
@@ -559,6 +568,11 @@ func TestPeerGroup(test *testing.T) {
PassiveMode: true,
},
},
+ Timers: config.Timers{
+ Config: config.TimersConfig{
+ ConnectRetry: 5,
+ },
+ },
}
configured := map[string]interface{}{
"config": map[string]interface{}{
@@ -599,7 +613,7 @@ func TestPeerGroup(test *testing.T) {
},
Timers: config.Timers{
Config: config.TimersConfig{
- ConnectRetry: 10,
+ ConnectRetry: 5,
},
},
}
@@ -615,6 +629,8 @@ func TestPeerGroup(test *testing.T) {
}
func TestDynamicNeighbor(t *testing.T) {
+ minConnectRetry = 5
+
assert := assert.New(t)
log.SetLevel(log.DebugLevel)
s1 := NewBgpServer()
@@ -634,6 +650,11 @@ func TestDynamicNeighbor(t *testing.T) {
PeerAs: 2,
PeerGroupName: "g",
},
+ Timers: config.Timers{
+ Config: config.TimersConfig{
+ ConnectRetry: 5,
+ },
+ },
}
err = s1.addPeerGroup(g)
assert.Nil(err)
@@ -671,7 +692,7 @@ func TestDynamicNeighbor(t *testing.T) {
},
Timers: config.Timers{
Config: config.TimersConfig{
- ConnectRetry: 10,
+ ConnectRetry: 5,
},
},
}
@@ -688,6 +709,8 @@ func TestDynamicNeighbor(t *testing.T) {
}
func TestGracefulRestartTimerExpired(t *testing.T) {
+ minConnectRetry = 5
+
assert := assert.New(t)
s1 := NewBgpServer()
go s1.Serve()
@@ -714,7 +737,7 @@ func TestGracefulRestartTimerExpired(t *testing.T) {
GracefulRestart: config.GracefulRestart{
Config: config.GracefulRestartConfig{
Enabled: true,
- RestartTime: 1,
+ RestartTime: 5,
},
},
}
@@ -751,7 +774,7 @@ func TestGracefulRestartTimerExpired(t *testing.T) {
},
Timers: config.Timers{
Config: config.TimersConfig{
- ConnectRetry: 10,
+ ConnectRetry: 5,
},
},
}
@@ -831,30 +854,19 @@ func TestFamiliesForSoftreset(t *testing.T) {
assert.NotContains(t, families, bgp.RF_RTC_UC)
}
-func runNewServer(ctx context.Context, as uint32, routerID string, listenPort int32) (*BgpServer, context.CancelFunc, error) {
+func runNewServer(as uint32, routerID string, listenPort int32) *BgpServer {
s := NewBgpServer()
- ctxInner, cancelInner := context.WithCancel(ctx)
go s.Serve()
- go func() {
- <-ctxInner.Done()
- stopCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- if err := s.StopBgp(stopCtx, &api.StopBgpRequest{}); err != nil {
- log.Fatalf("Failed to stop server %s: %s", s.bgpConfig.Global.Config.RouterId, err)
- }
- cancel()
- }()
-
- err := s.StartBgp(ctx, &api.StartBgpRequest{
+ if err := s.StartBgp(context.Background(), &api.StartBgpRequest{
Global: &api.Global{
As: as,
RouterId: routerID,
ListenPort: listenPort,
},
- })
- if err != nil {
- s = nil
+ }); err != nil {
+ log.Fatalf("Failed to start server %s: %s", s.bgpConfig.Global.Config.RouterId, err)
}
- return s, cancelInner, err
+ return s
}
func peerServers(t *testing.T, ctx context.Context, servers []*BgpServer, families []config.AfiSafiType) error {
@@ -875,6 +887,11 @@ func peerServers(t *testing.T, ctx context.Context, servers []*BgpServer, famili
RemotePort: uint16(peer.bgpConfig.Global.Config.Port),
},
},
+ Timers: config.Timers{
+ Config: config.TimersConfig{
+ ConnectRetry: 5,
+ },
+ },
}
// first server to get neighbor config is passive to hopefully make handshake faster
@@ -913,7 +930,7 @@ func parseRDRT(rdStr string) (bgp.RouteDistinguisherInterface, bgp.ExtendedCommu
return rd, rt, nil
}
-func addVrf(t *testing.T, ctx context.Context, s *BgpServer, vrfName, rdStr string, id uint32) {
+func addVrf(t *testing.T, s *BgpServer, vrfName, rdStr string, id uint32) {
rd, rt, err := parseRDRT(rdStr)
if err != nil {
t.Fatal(err)
@@ -928,34 +945,24 @@ func addVrf(t *testing.T, ctx context.Context, s *BgpServer, vrfName, rdStr stri
Id: id,
},
}
- if err = s.AddVrf(ctx, req); err != nil {
+ if err = s.AddVrf(context.Background(), req); err != nil {
t.Fatal(err)
}
}
func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
- // missing it may cause Test_DialTCP_FDleak to fail
- defer time.Sleep(time.Second)
+ minConnectRetry = 5
- ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
- defer cancel()
+ ctx := context.Background()
log.SetLevel(log.DebugLevel)
- s1, cf1, err := runNewServer(ctx, 1, "1.1.1.1", 10179)
- if err != nil {
- t.Fatal(err)
- }
- defer func() { cf1() }()
- s2, cf2, err := runNewServer(ctx, 1, "2.2.2.2", 20179)
- if err != nil {
- t.Fatal(err)
- }
- defer func() { cf2() }()
+ s1 := runNewServer(1, "1.1.1.1", 10179)
+ s2 := runNewServer(1, "2.2.2.2", 20179)
- addVrf(t, ctx, s1, "vrf1", "111:111", 1)
- addVrf(t, ctx, s2, "vrf1", "111:111", 1)
+ addVrf(t, s1, "vrf1", "111:111", 1)
+ addVrf(t, s2, "vrf1", "111:111", 1)
- if err = peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil {
+ if err := peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil {
t.Fatal(err)
}
watcher := s1.watch(watchUpdate(true))
@@ -977,7 +984,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
}
// s1 should receive this route from s2
-
+ t1 := time.NewTimer(time.Duration(30 * time.Second))
for found := false; !found; {
select {
case ev := <-watcher.Event():
@@ -995,10 +1002,11 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
}
}
}
- case <-ctx.Done():
+ case <-t1.C:
t.Fatalf("timeout while waiting for update path event")
}
}
+ t1.Stop()
// fabricate duplicated rtc message from s1
// s2 should not send vpn route again
@@ -1020,7 +1028,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
s1Peer := s2.neighborMap["127.0.0.1"]
s2.propagateUpdate(s1Peer, []*table.Path{rtcPath})
- awaitUpdateCtx, cancel := context.WithTimeout(ctx, time.Second)
+ t2 := time.NewTimer(time.Duration(2 * time.Second))
for done := false; !done; {
select {
case ev := <-watcher.Event():
@@ -1033,11 +1041,12 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) {
}
}
}
- //case <-timer.C:
- case <-awaitUpdateCtx.Done():
+ case <-t2.C:
log.Infof("await update done")
done = true
}
}
- cancel()
+
+ s1.StopBgp(context.Background(), &api.StopBgpRequest{})
+ s2.StopBgp(context.Background(), &api.StopBgpRequest{})
}