From 46e375476196b6e670f6a627448471636dac69e6 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Mon, 3 Dec 2018 21:04:08 +0900 Subject: server: replace tomb with context All what we need is cancelling. Let's use the standard way for it, context. Signed-off-by: FUJITA Tomonori --- pkg/server/fsm.go | 391 ++++++++++++++++++++++++---------------------- pkg/server/fsm_test.go | 11 +- pkg/server/peer.go | 29 +--- pkg/server/server_test.go | 101 ++++++------ 4 files changed, 276 insertions(+), 256 deletions(-) (limited to 'pkg') diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index c9198af1..2440f65a 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -16,6 +16,7 @@ package server import ( + "context" "fmt" "io" "math/rand" @@ -32,7 +33,6 @@ import ( "github.com/osrg/gobgp/pkg/packet/bmp" log "github.com/sirupsen/logrus" - "gopkg.in/tomb.v2" ) type peerDownReason int @@ -190,7 +190,6 @@ type adminStateOperation struct { var fsmVersion uint type fsm struct { - t tomb.Tomb gConf *config.Global pConf *config.Neighbor lock sync.RWMutex @@ -202,7 +201,6 @@ type fsm struct { opensentHoldTime float64 adminState adminState adminStateCh chan adminStateOperation - getActiveCh chan struct{} h *fsmHandler rfMap map[bgp.RouteFamily]bgp.BGPAddPathMode capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface @@ -296,7 +294,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP opensentHoldTime: float64(holdtimeOpensent), adminState: adminState, adminStateCh: make(chan adminStateOperation, 1), - getActiveCh: make(chan struct{}), rfMap: make(map[bgp.RouteFamily]bgp.BGPAddPathMode), capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface), peerInfo: table.NewPeerInfo(gConf, pConf), @@ -305,7 +302,6 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP version: fsmVersion, } fsm.gracefulRestartTimer.Stop() - fsm.t.Go(fsm.connectLoop) return fsm } @@ -343,11 +339,6 @@ func (fsm *fsm) StateChange(nextState bgp.FSMState) { if !y { fsm.twoByteAsTrans = true } - case bgp.BGP_FSM_ACTIVE: - if !fsm.pConf.Transport.Config.PassiveMode { - fsm.getActiveCh <- struct{}{} - } - fallthrough default: fsm.pConf.Timers.State.Downtime = time.Now().Unix() } @@ -403,106 +394,7 @@ func (fsm *fsm) sendNotification(code, subType uint8, data []byte, msg string) ( return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError)) } -func (fsm *fsm) connectLoop() error { - fsm.lock.RLock() - tick := int(fsm.pConf.Timers.Config.ConnectRetry) - fsm.lock.RUnlock() - if tick < minConnectRetry { - tick = minConnectRetry - } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - - timer := time.NewTimer(time.Duration(tick) * time.Second) - timer.Stop() - - connect := func() { - fsm.lock.RLock() - defer fsm.lock.RUnlock() - - addr := fsm.pConf.State.NeighborAddress - port := int(bgp.BGP_PORT) - if fsm.pConf.Transport.Config.RemotePort != 0 { - port = int(fsm.pConf.Transport.Config.RemotePort) - } - laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(fsm.pConf.Transport.Config.LocalAddress, "0")) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": addr, - }).Warnf("failed to resolve local address: %s", err) - return - } - password := fsm.pConf.Config.AuthPassword - ttl := uint8(0) - ttlMin := uint8(0) - - if fsm.pConf.TtlSecurity.Config.Enabled { - ttl = 255 - ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin - } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { - ttl = 1 - if fsm.pConf.EbgpMultihop.Config.Enabled { - ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl - } - } - - d := net.Dialer{ - LocalAddr: laddr, - Timeout: time.Duration(minConnectRetry-1) * time.Second, - Control: func(network, address string, c syscall.RawConn) error { - return dialerControl(network, address, c, ttl, ttlMin, password) - }, - } - conn, err := d.Dial("tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port))) - if err == nil { - select { - case fsm.connCh <- conn: - return - default: - conn.Close() - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": addr, - }).Warn("active conn is closed to avoid being blocked") - } - } else { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": addr, - }).Debugf("failed to connect: %s", err) - } - - if fsm.state == bgp.BGP_FSM_ACTIVE { - timer.Reset(time.Duration(tick) * time.Second) - } - } - - for { - select { - case <-fsm.t.Dying(): - fsm.lock.RLock() - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": fsm.pConf.State.NeighborAddress, - }).Debug("stop connect loop") - fsm.lock.RUnlock() - return nil - case <-timer.C: - fsm.lock.RLock() - ready := fsm.state == bgp.BGP_FSM_ACTIVE - fsm.lock.RUnlock() - if ready { - go connect() - } - case <-fsm.getActiveCh: - timer.Reset(time.Duration(r.Intn(minConnectRetry)+minConnectRetry) * time.Second) - } - } -} - type fsmHandler struct { - t tomb.Tomb fsm *fsm conn net.Conn msgCh *channels.InfiniteChannel @@ -512,9 +404,13 @@ type fsmHandler struct { outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification *bgp.BGPMessage + ctx context.Context + ctxCancel context.CancelFunc + wg *sync.WaitGroup } func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler { + ctx, cancel := context.WithCancel(context.Background()) h := &fsmHandler{ fsm: fsm, stateReasonCh: make(chan fsmStateReason, 2), @@ -522,12 +418,16 @@ func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *f stateCh: stateCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), + wg: &sync.WaitGroup{}, + ctx: ctx, + ctxCancel: cancel, } - fsm.t.Go(h.loop) + h.wg.Add(1) + go h.loop(ctx, h.wg) return h } -func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) { +func (h *fsmHandler) idle(ctx context.Context) (bgp.FSMState, *fsmStateReason) { fsm := h.fsm fsm.lock.RLock() @@ -536,7 +436,7 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) { for { select { - case <-h.t.Dying(): + case <-ctx.Done(): return -1, newfsmStateReason(fsmDying, nil, nil) case <-fsm.gracefulRestartTimer.C: fsm.lock.RLock() @@ -605,11 +505,123 @@ func (h *fsmHandler) idle() (bgp.FSMState, *fsmStateReason) { } } -func (h *fsmHandler) active() (bgp.FSMState, *fsmStateReason) { +func (h *fsmHandler) connectLoop(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() fsm := h.fsm + + tick, addr, port, password, ttl, ttlMin, localAddress := func() (int, string, int, string, uint8, uint8, string) { + fsm.lock.RLock() + defer fsm.lock.RUnlock() + + tick := int(fsm.pConf.Timers.Config.ConnectRetry) + if tick < minConnectRetry { + tick = minConnectRetry + } + + addr := fsm.pConf.State.NeighborAddress + port := int(bgp.BGP_PORT) + if fsm.pConf.Transport.Config.RemotePort != 0 { + port = int(fsm.pConf.Transport.Config.RemotePort) + } + password := fsm.pConf.Config.AuthPassword + ttl := uint8(0) + ttlMin := uint8(0) + + if fsm.pConf.TtlSecurity.Config.Enabled { + ttl = 255 + ttlMin = fsm.pConf.TtlSecurity.Config.TtlMin + } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { + ttl = 1 + if fsm.pConf.EbgpMultihop.Config.Enabled { + ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl + } + } + return tick, addr, port, password, ttl, ttlMin, fsm.pConf.Transport.Config.LocalAddress + }() + for { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + timer := time.NewTimer(time.Duration(r.Intn(tick)+tick) * time.Second) select { - case <-h.t.Dying(): + case <-ctx.Done(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Debug("stop connect loop") + timer.Stop() + return + case <-timer.C: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Debug("try to connect") + } + + laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(localAddress, "0")) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Warnf("failed to resolve local address: %s", err) + } + + if err == nil { + d := net.Dialer{ + LocalAddr: laddr, + Timeout: time.Duration(tick-1) * time.Second, + Control: func(network, address string, c syscall.RawConn) error { + return dialerControl(network, address, c, ttl, ttlMin, password) + }, + } + + conn, err := d.DialContext(ctx, "tcp", net.JoinHostPort(addr, fmt.Sprintf("%d", port))) + select { + case <-ctx.Done(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Debug("stop connect loop") + return + default: + } + + if err == nil { + select { + case fsm.connCh <- conn: + return + default: + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Warn("active conn is closed to avoid being blocked") + } + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + }).Debugf("failed to connect: %s", err) + } + } + } +} + +func (h *fsmHandler) active(ctx context.Context) (bgp.FSMState, *fsmStateReason) { + c, cancel := context.WithCancel(ctx) + + var wg sync.WaitGroup + wg.Add(1) + go h.connectLoop(c, &wg) + + defer func() { + cancel() + wg.Wait() + }() + + fsm := h.fsm + for { + select { + case <-ctx.Done(): return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: if !ok { @@ -1118,8 +1130,11 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { return fmsg, nil } -func (h *fsmHandler) recvMessage() error { - defer h.msgCh.Close() +func (h *fsmHandler) recvMessage(ctx context.Context, wg *sync.WaitGroup) error { + defer func() { + h.msgCh.Close() + wg.Done() + }() fmsg, _ := h.recvMessageWithError() if fmsg != nil { h.msgCh.In() <- fmsg @@ -1184,7 +1199,7 @@ func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode] return capMap, negotiated } -func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) { +func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReason) { fsm := h.fsm fsm.lock.RLock() @@ -1201,7 +1216,10 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) { h.conn = fsm.conn fsm.lock.RUnlock() - h.t.Go(h.recvMessage) + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + go h.recvMessage(ctx, &wg) // RFC 4271 P.60 // sets its HoldTimer to a large value @@ -1213,7 +1231,7 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) { for { select { - case <-h.t.Dying(): + case <-ctx.Done(): h.conn.Close() return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: @@ -1420,7 +1438,6 @@ func (h *fsmHandler) opensent() (bgp.FSMState, *fsmStateReason) { return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") - h.t.Kill(nil) return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil) case stateOp := <-fsm.adminStateCh: err := h.changeadminState(stateOp.State) @@ -1457,14 +1474,17 @@ func keepaliveTicker(fsm *fsm) *time.Ticker { return time.NewTicker(sec) } -func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) { +func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) h.msgCh = channels.NewInfiniteChannel() fsm.lock.RLock() h.conn = fsm.conn - h.t.Go(h.recvMessage) + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go h.recvMessage(ctx, &wg) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { @@ -1478,7 +1498,7 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) { for { select { - case <-h.t.Dying(): + case <-ctx.Done(): h.conn.Close() return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: @@ -1544,7 +1564,6 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) { return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") - h.t.Kill(nil) return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmHoldTimerExpired, m, nil) case stateOp := <-fsm.adminStateCh: err := h.changeadminState(stateOp.State) @@ -1566,7 +1585,8 @@ func (h *fsmHandler) openconfirm() (bgp.FSMState, *fsmStateReason) { } } -func (h *fsmHandler) sendMessageloop() error { +func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) error { + defer wg.Done() conn := h.conn fsm := h.fsm ticker := keepaliveTicker(fsm) @@ -1678,27 +1698,31 @@ func (h *fsmHandler) sendMessageloop() error { for { select { - case <-h.t.Dying(): + case <-ctx.Done(): return nil case o := <-h.outgoing.Out(): - m := o.(*fsmOutgoingMsg) - h.fsm.lock.RLock() - options := h.fsm.marshallingOptions - h.fsm.lock.RUnlock() - for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { - if err := send(msg); err != nil { - return nil - } - } - if m.Notification != nil { - if m.StayIdle { - // current user is only prefix-limit - // fix me if this is not the case - h.changeadminState(adminStatePfxCt) + switch m := o.(type) { + case *fsmOutgoingMsg: + h.fsm.lock.RLock() + options := h.fsm.marshallingOptions + h.fsm.lock.RUnlock() + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { + if err := send(msg); err != nil { + return nil + } } - if err := send(m.Notification); err != nil { - return nil + if m.Notification != nil { + if m.StayIdle { + // current user is only prefix-limit + // fix me if this is not the case + h.changeadminState(adminStatePfxCt) + } + if err := send(m.Notification); err != nil { + return nil + } } + default: + return nil } case <-ticker.C: if err := send(bgp.NewBGPKeepAliveMessage()); err != nil { @@ -1708,7 +1732,8 @@ func (h *fsmHandler) sendMessageloop() error { } } -func (h *fsmHandler) recvMessageloop() error { +func (h *fsmHandler) recvMessageloop(ctx context.Context, wg *sync.WaitGroup) error { + defer wg.Done() for { fmsg, err := h.recvMessageWithError() if fmsg != nil { @@ -1720,14 +1745,19 @@ func (h *fsmHandler) recvMessageloop() error { } } -func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) { +func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateReason) { + var wg sync.WaitGroup fsm := h.fsm fsm.lock.Lock() h.conn = fsm.conn fsm.lock.Unlock() - h.t.Go(h.sendMessageloop) + + defer wg.Wait() + wg.Add(2) + + go h.sendMessageloop(ctx, &wg) h.msgCh = h.incoming - h.t.Go(h.recvMessageloop) + go h.recvMessageloop(ctx, &wg) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { @@ -1742,7 +1772,8 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) { for { select { - case <-h.t.Dying(): + case <-ctx.Done(): + h.conn.Close() return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: if !ok { @@ -1758,7 +1789,12 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) { fsm.lock.RUnlock() case err := <-h.stateReasonCh: h.conn.Close() - h.t.Kill(nil) + // if recv goroutine hit an error and sent to + // stateReasonCh, then tx goroutine might take + // long until it exits because it waits for + // ctx.Done() or keepalive timer. So let kill + // it now. + h.outgoing.In() <- err fsm.lock.RLock() if s := fsm.pConf.GracefulRestart.State; s.Enabled && (s.NotificationEnabled && err.Type == fsmNotificationRecv || @@ -1804,43 +1840,38 @@ func (h *fsmHandler) established() (bgp.FSMState, *fsmStateReason) { } } -func (h *fsmHandler) loop() error { +func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error { + defer wg.Done() + fsm := h.fsm - ch := make(chan bgp.FSMState) fsm.lock.RLock() oldState := fsm.state fsm.lock.RUnlock() var reason *fsmStateReason - f := func() error { - nextState := bgp.FSMState(-1) - fsm.lock.RLock() - fsmState := fsm.state - fsm.lock.RUnlock() - switch fsmState { - case bgp.BGP_FSM_IDLE: - nextState, reason = h.idle() - // case bgp.BGP_FSM_CONNECT: - // nextState = h.connect() - case bgp.BGP_FSM_ACTIVE: - nextState, reason = h.active() - case bgp.BGP_FSM_OPENSENT: - nextState, reason = h.opensent() - case bgp.BGP_FSM_OPENCONFIRM: - nextState, reason = h.openconfirm() - case bgp.BGP_FSM_ESTABLISHED: - nextState, reason = h.established() - } - fsm.reason = reason - ch <- nextState - return nil - } - - h.t.Go(f) + nextState := bgp.FSMState(-1) + fsm.lock.RLock() + fsmState := fsm.state + fsm.lock.RUnlock() - nextState := <-ch + switch fsmState { + case bgp.BGP_FSM_IDLE: + nextState, reason = h.idle(ctx) + // case bgp.BGP_FSM_CONNECT: + // nextState = h.connect() + case bgp.BGP_FSM_ACTIVE: + nextState, reason = h.active(ctx) + case bgp.BGP_FSM_OPENSENT: + nextState, reason = h.opensent(ctx) + case bgp.BGP_FSM_OPENCONFIRM: + nextState, reason = h.openconfirm(ctx) + case bgp.BGP_FSM_ESTABLISHED: + nextState, reason = h.established(ctx) + } fsm.lock.RLock() + fsm.reason = reason + if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { log.WithFields(log.Fields{ "Topic": "Peer", @@ -1867,13 +1898,7 @@ func (h *fsmHandler) loop() error { } fsm.lock.RUnlock() - e := time.AfterFunc(time.Second*120, func() { - log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState) - }) - h.t.Wait() - e.Stop() - - // under zero means that tomb.Dying() + // under zero means that the context was canceled. if nextState >= bgp.BGP_FSM_IDLE { fsm.lock.RLock() h.stateCh <- &fsmMsg{ diff --git a/pkg/server/fsm_test.go b/pkg/server/fsm_test.go index 1e245a7e..cd3eb7e2 100644 --- a/pkg/server/fsm_test.go +++ b/pkg/server/fsm_test.go @@ -16,6 +16,7 @@ package server import ( + "context" "errors" "net" "strconv" @@ -181,7 +182,7 @@ func TestFSMHandlerOpensent_HoldTimerExpired(t *testing.T) { // set holdtime p.fsm.opensentHoldTime = 2 - state, _ := h.opensent() + state, _ := h.opensent(context.Background()) assert.Equal(bgp.BGP_FSM_IDLE, state) lastMsg := m.sendBuf[len(m.sendBuf)-1] @@ -206,7 +207,7 @@ func TestFSMHandlerOpenconfirm_HoldTimerExpired(t *testing.T) { // set holdtime p.fsm.pConf.Timers.State.NegotiatedHoldTime = 2 - state, _ := h.openconfirm() + state, _ := h.openconfirm(context.Background()) assert.Equal(bgp.BGP_FSM_IDLE, state) lastMsg := m.sendBuf[len(m.sendBuf)-1] @@ -244,7 +245,7 @@ func TestFSMHandlerEstablish_HoldTimerExpired(t *testing.T) { p.fsm.pConf.Timers.State.NegotiatedHoldTime = 2 go pushPackets() - state, _ := h.established() + state, _ := h.established(context.Background()) time.Sleep(time.Second * 1) assert.Equal(bgp.BGP_FSM_IDLE, state) m.mtx.Lock() @@ -270,7 +271,7 @@ func TestFSMHandlerOpenconfirm_HoldtimeZero(t *testing.T) { p.fsm.pConf.Timers.Config.KeepaliveInterval = 1 // set holdtime p.fsm.pConf.Timers.State.NegotiatedHoldTime = 0 - go h.openconfirm() + go h.openconfirm(context.Background()) time.Sleep(100 * time.Millisecond) @@ -292,7 +293,7 @@ func TestFSMHandlerEstablished_HoldtimeZero(t *testing.T) { // set holdtime p.fsm.pConf.Timers.State.NegotiatedHoldTime = 0 - go h.established() + go h.established(context.Background()) time.Sleep(100 * time.Millisecond) diff --git a/pkg/server/peer.go b/pkg/server/peer.go index 83a5d12f..4e51137a 100644 --- a/pkg/server/peer.go +++ b/pkg/server/peer.go @@ -29,10 +29,11 @@ import ( ) const ( - flopThreshold = time.Second * 30 - minConnectRetry = 10 + flopThreshold = time.Second * 30 ) +var minConnectRetry = 10 + type peerGroup struct { Conf *config.PeerGroup members map[string]config.Neighbor @@ -560,6 +561,7 @@ func (peer *peer) stopFSM() error { failed := false peer.fsm.lock.RLock() addr := peer.fsm.pConf.State.NeighborAddress + state := peer.fsm.state peer.fsm.lock.RUnlock() t1 := time.AfterFunc(time.Minute*5, func() { log.WithFields(log.Fields{ @@ -567,33 +569,16 @@ func (peer *peer) stopFSM() error { }).Warnf("Failed to free the fsm.h.t for %s", addr) failed = true }) - - peer.fsm.h.t.Kill(nil) - peer.fsm.h.t.Wait() + peer.fsm.h.ctxCancel() + peer.fsm.h.wg.Wait() t1.Stop() if !failed { log.WithFields(log.Fields{ "Topic": "Peer", "Key": addr, + "State": state, }).Debug("freed fsm.h.t") cleanInfiniteChannel(peer.outgoing) } - failed = false - t2 := time.AfterFunc(time.Minute*5, func() { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Warnf("Failed to free the fsm.t for %s", addr) - failed = true - }) - peer.fsm.t.Kill(nil) - peer.fsm.t.Wait() - t2.Stop() - if !failed { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": addr, - }).Debug("freed fsm.t") - return nil - } return fmt.Errorf("Failed to free FSM for %s", addr) } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index c8975e41..3bd47304 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -169,6 +169,8 @@ func TestListPolicyAssignment(t *testing.T) { } func TestMonitor(test *testing.T) { + minConnectRetry = 5 + assert := assert.New(test) s := NewBgpServer() go s.Serve() @@ -220,7 +222,7 @@ func TestMonitor(test *testing.T) { }, Timers: config.Timers{ Config: config.TimersConfig{ - ConnectRetry: 10, + ConnectRetry: 5, }, }, } @@ -526,6 +528,8 @@ func TestFilterpathWithRejectPolicy(t *testing.T) { } func TestPeerGroup(test *testing.T) { + minConnectRetry = 5 + assert := assert.New(test) log.SetLevel(log.DebugLevel) s := NewBgpServer() @@ -545,6 +549,11 @@ func TestPeerGroup(test *testing.T) { PeerAs: 2, PeerGroupName: "g", }, + Timers: config.Timers{ + Config: config.TimersConfig{ + ConnectRetry: 5, + }, + }, } err = s.addPeerGroup(g) assert.Nil(err) @@ -559,6 +568,11 @@ func TestPeerGroup(test *testing.T) { PassiveMode: true, }, }, + Timers: config.Timers{ + Config: config.TimersConfig{ + ConnectRetry: 5, + }, + }, } configured := map[string]interface{}{ "config": map[string]interface{}{ @@ -599,7 +613,7 @@ func TestPeerGroup(test *testing.T) { }, Timers: config.Timers{ Config: config.TimersConfig{ - ConnectRetry: 10, + ConnectRetry: 5, }, }, } @@ -615,6 +629,8 @@ func TestPeerGroup(test *testing.T) { } func TestDynamicNeighbor(t *testing.T) { + minConnectRetry = 5 + assert := assert.New(t) log.SetLevel(log.DebugLevel) s1 := NewBgpServer() @@ -634,6 +650,11 @@ func TestDynamicNeighbor(t *testing.T) { PeerAs: 2, PeerGroupName: "g", }, + Timers: config.Timers{ + Config: config.TimersConfig{ + ConnectRetry: 5, + }, + }, } err = s1.addPeerGroup(g) assert.Nil(err) @@ -671,7 +692,7 @@ func TestDynamicNeighbor(t *testing.T) { }, Timers: config.Timers{ Config: config.TimersConfig{ - ConnectRetry: 10, + ConnectRetry: 5, }, }, } @@ -688,6 +709,8 @@ func TestDynamicNeighbor(t *testing.T) { } func TestGracefulRestartTimerExpired(t *testing.T) { + minConnectRetry = 5 + assert := assert.New(t) s1 := NewBgpServer() go s1.Serve() @@ -714,7 +737,7 @@ func TestGracefulRestartTimerExpired(t *testing.T) { GracefulRestart: config.GracefulRestart{ Config: config.GracefulRestartConfig{ Enabled: true, - RestartTime: 1, + RestartTime: 5, }, }, } @@ -751,7 +774,7 @@ func TestGracefulRestartTimerExpired(t *testing.T) { }, Timers: config.Timers{ Config: config.TimersConfig{ - ConnectRetry: 10, + ConnectRetry: 5, }, }, } @@ -831,30 +854,19 @@ func TestFamiliesForSoftreset(t *testing.T) { assert.NotContains(t, families, bgp.RF_RTC_UC) } -func runNewServer(ctx context.Context, as uint32, routerID string, listenPort int32) (*BgpServer, context.CancelFunc, error) { +func runNewServer(as uint32, routerID string, listenPort int32) *BgpServer { s := NewBgpServer() - ctxInner, cancelInner := context.WithCancel(ctx) go s.Serve() - go func() { - <-ctxInner.Done() - stopCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if err := s.StopBgp(stopCtx, &api.StopBgpRequest{}); err != nil { - log.Fatalf("Failed to stop server %s: %s", s.bgpConfig.Global.Config.RouterId, err) - } - cancel() - }() - - err := s.StartBgp(ctx, &api.StartBgpRequest{ + if err := s.StartBgp(context.Background(), &api.StartBgpRequest{ Global: &api.Global{ As: as, RouterId: routerID, ListenPort: listenPort, }, - }) - if err != nil { - s = nil + }); err != nil { + log.Fatalf("Failed to start server %s: %s", s.bgpConfig.Global.Config.RouterId, err) } - return s, cancelInner, err + return s } func peerServers(t *testing.T, ctx context.Context, servers []*BgpServer, families []config.AfiSafiType) error { @@ -875,6 +887,11 @@ func peerServers(t *testing.T, ctx context.Context, servers []*BgpServer, famili RemotePort: uint16(peer.bgpConfig.Global.Config.Port), }, }, + Timers: config.Timers{ + Config: config.TimersConfig{ + ConnectRetry: 5, + }, + }, } // first server to get neighbor config is passive to hopefully make handshake faster @@ -913,7 +930,7 @@ func parseRDRT(rdStr string) (bgp.RouteDistinguisherInterface, bgp.ExtendedCommu return rd, rt, nil } -func addVrf(t *testing.T, ctx context.Context, s *BgpServer, vrfName, rdStr string, id uint32) { +func addVrf(t *testing.T, s *BgpServer, vrfName, rdStr string, id uint32) { rd, rt, err := parseRDRT(rdStr) if err != nil { t.Fatal(err) @@ -928,34 +945,24 @@ func addVrf(t *testing.T, ctx context.Context, s *BgpServer, vrfName, rdStr stri Id: id, }, } - if err = s.AddVrf(ctx, req); err != nil { + if err = s.AddVrf(context.Background(), req); err != nil { t.Fatal(err) } } func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { - // missing it may cause Test_DialTCP_FDleak to fail - defer time.Sleep(time.Second) + minConnectRetry = 5 - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() + ctx := context.Background() log.SetLevel(log.DebugLevel) - s1, cf1, err := runNewServer(ctx, 1, "1.1.1.1", 10179) - if err != nil { - t.Fatal(err) - } - defer func() { cf1() }() - s2, cf2, err := runNewServer(ctx, 1, "2.2.2.2", 20179) - if err != nil { - t.Fatal(err) - } - defer func() { cf2() }() + s1 := runNewServer(1, "1.1.1.1", 10179) + s2 := runNewServer(1, "2.2.2.2", 20179) - addVrf(t, ctx, s1, "vrf1", "111:111", 1) - addVrf(t, ctx, s2, "vrf1", "111:111", 1) + addVrf(t, s1, "vrf1", "111:111", 1) + addVrf(t, s2, "vrf1", "111:111", 1) - if err = peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil { + if err := peerServers(t, ctx, []*BgpServer{s1, s2}, []config.AfiSafiType{config.AFI_SAFI_TYPE_L3VPN_IPV4_UNICAST, config.AFI_SAFI_TYPE_RTC}); err != nil { t.Fatal(err) } watcher := s1.watch(watchUpdate(true)) @@ -977,7 +984,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { } // s1 should receive this route from s2 - + t1 := time.NewTimer(time.Duration(30 * time.Second)) for found := false; !found; { select { case ev := <-watcher.Event(): @@ -995,10 +1002,11 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { } } } - case <-ctx.Done(): + case <-t1.C: t.Fatalf("timeout while waiting for update path event") } } + t1.Stop() // fabricate duplicated rtc message from s1 // s2 should not send vpn route again @@ -1020,7 +1028,7 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { s1Peer := s2.neighborMap["127.0.0.1"] s2.propagateUpdate(s1Peer, []*table.Path{rtcPath}) - awaitUpdateCtx, cancel := context.WithTimeout(ctx, time.Second) + t2 := time.NewTimer(time.Duration(2 * time.Second)) for done := false; !done; { select { case ev := <-watcher.Event(): @@ -1033,11 +1041,12 @@ func TestDoNotReactToDuplicateRTCMemberships(t *testing.T) { } } } - //case <-timer.C: - case <-awaitUpdateCtx.Done(): + case <-t2.C: log.Infof("await update done") done = true } } - cancel() + + s1.StopBgp(context.Background(), &api.StopBgpRequest{}) + s2.StopBgp(context.Background(), &api.StopBgpRequest{}) } -- cgit v1.2.3