summaryrefslogtreecommitdiffhomepage
path: root/pkg/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@gmail.com>2019-03-23 20:48:18 +0900
committerFUJITA Tomonori <fujita.tomonori@gmail.com>2019-03-24 20:34:21 +0900
commitc6aef93ef141b7eba7d682bf3e52cc82b41b926b (patch)
tree4a32f87754e7acea27862a593eade2d9d6ddc201 /pkg/server
parentddf9e5572f144ebac031e4128c8e3432470c6ef7 (diff)
fix race of peer deletion
Fixed a race bug that causes the unittest failure. Also fixed StopBgp() to block until all the peers are deleted cleanly. Signed-off-by: FUJITA Tomonori <fujita.tomonori@gmail.com>
Diffstat (limited to 'pkg/server')
-rw-r--r--pkg/server/fsm.go49
-rw-r--r--pkg/server/fsm_test.go5
-rw-r--r--pkg/server/peer.go33
-rw-r--r--pkg/server/server.go344
-rw-r--r--pkg/server/server_test.go103
5 files changed, 278 insertions, 256 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go
index 6eadfa92..e66054c7 100644
--- a/pkg/server/fsm.go
+++ b/pkg/server/fsm.go
@@ -123,13 +123,13 @@ const (
type fsmMsg struct {
MsgType fsmMsgType
+ fsm *fsm
MsgSrc string
MsgData interface{}
StateReason *fsmStateReason
PathList []*table.Path
timestamp time.Time
payload []byte
- Version uint
}
type fsmOutgoingMsg struct {
@@ -169,13 +169,13 @@ type adminStateOperation struct {
Communication []byte
}
-var fsmVersion uint
-
type fsm struct {
gConf *config.Global
pConf *config.Neighbor
lock sync.RWMutex
state bgp.FSMState
+ outgoingCh *channels.InfiniteChannel
+ incomingCh *channels.InfiniteChannel
reason *fsmStateReason
conn net.Conn
connCh chan net.Conn
@@ -191,8 +191,8 @@ type fsm struct {
policy *table.RoutingPolicy
gracefulRestartTimer *time.Timer
twoByteAsTrans bool
- version uint
marshallingOptions *bgp.MarshallingOption
+ notification chan *bgp.BGPMessage
}
func (fsm *fsm) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
@@ -267,11 +267,12 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
}
pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)]
pConf.Timers.State.Downtime = time.Now().Unix()
- fsmVersion++
fsm := &fsm{
gConf: gConf,
pConf: pConf,
state: bgp.BGP_FSM_IDLE,
+ outgoingCh: channels.NewInfiniteChannel(),
+ incomingCh: channels.NewInfiniteChannel(),
connCh: make(chan net.Conn, 1),
opensentHoldTime: float64(holdtimeOpensent),
adminState: adminState,
@@ -281,7 +282,7 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP
peerInfo: table.NewPeerInfo(gConf, pConf),
policy: policy,
gracefulRestartTimer: time.NewTimer(time.Hour),
- version: fsmVersion,
+ notification: make(chan *bgp.BGPMessage, 1),
}
fsm.gracefulRestartTimer.Stop()
return fsm
@@ -382,7 +383,6 @@ type fsmHandler struct {
msgCh *channels.InfiniteChannel
stateReasonCh chan fsmStateReason
incoming *channels.InfiniteChannel
- stateCh chan *fsmMsg
outgoing *channels.InfiniteChannel
holdTimerResetCh chan bool
sentNotification *bgp.BGPMessage
@@ -391,13 +391,12 @@ type fsmHandler struct {
wg *sync.WaitGroup
}
-func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler {
+func newFSMHandler(fsm *fsm, outgoing *channels.InfiniteChannel) *fsmHandler {
ctx, cancel := context.WithCancel(context.Background())
h := &fsmHandler{
fsm: fsm,
stateReasonCh: make(chan fsmStateReason, 2),
- incoming: incoming,
- stateCh: stateCh,
+ incoming: fsm.incomingCh,
outgoing: outgoing,
holdTimerResetCh: make(chan bool, 2),
wg: &sync.WaitGroup{},
@@ -944,10 +943,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
"error": err,
}).Warn("Session will be reset due to malformed BGP Header")
fmsg := &fsmMsg{
+ fsm: h.fsm,
MsgType: fsmMsgBGPMessage,
MsgSrc: h.fsm.pConf.State.NeighborAddress,
MsgData: err,
- Version: h.fsm.version,
}
h.fsm.lock.RUnlock()
return fmsg, err
@@ -977,10 +976,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
}
h.fsm.lock.RLock()
fmsg := &fsmMsg{
+ fsm: h.fsm,
MsgType: fsmMsgBGPMessage,
MsgSrc: h.fsm.pConf.State.NeighborAddress,
timestamp: now,
- Version: h.fsm.version,
}
h.fsm.lock.RUnlock()
@@ -1757,6 +1756,13 @@ func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateRe
for {
select {
case <-ctx.Done():
+ select {
+ case m := <-fsm.notification:
+ b, _ := m.Serialize(h.fsm.marshallingOptions)
+ h.conn.Write(b)
+ default:
+ // nothing to do
+ }
h.conn.Close()
return -1, newfsmStateReason(fsmDying, nil, nil)
case conn, ok := <-fsm.connCh:
@@ -1881,18 +1887,15 @@ func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error {
}
fsm.lock.RUnlock()
- // under zero means that the context was canceled.
- if nextState >= bgp.BGP_FSM_IDLE {
- fsm.lock.RLock()
- h.stateCh <- &fsmMsg{
- MsgType: fsmMsgStateChange,
- MsgSrc: fsm.pConf.State.NeighborAddress,
- MsgData: nextState,
- StateReason: reason,
- Version: h.fsm.version,
- }
- fsm.lock.RUnlock()
+ fsm.lock.RLock()
+ h.incoming.In() <- &fsmMsg{
+ fsm: fsm,
+ MsgType: fsmMsgStateChange,
+ MsgSrc: fsm.pConf.State.NeighborAddress,
+ MsgData: nextState,
+ StateReason: reason,
}
+ fsm.lock.RUnlock()
return nil
}
diff --git a/pkg/server/fsm_test.go b/pkg/server/fsm_test.go
index cd3eb7e2..ac5ffd7b 100644
--- a/pkg/server/fsm_test.go
+++ b/pkg/server/fsm_test.go
@@ -311,15 +311,14 @@ func TestCheckOwnASLoop(t *testing.T) {
func makePeerAndHandler() (*peer, *fsmHandler) {
p := &peer{
- fsm: newFSM(&config.Global{}, &config.Neighbor{}, table.NewRoutingPolicy()),
- outgoing: channels.NewInfiniteChannel(),
+ fsm: newFSM(&config.Global{}, &config.Neighbor{}, table.NewRoutingPolicy()),
}
h := &fsmHandler{
fsm: p.fsm,
stateReasonCh: make(chan fsmStateReason, 2),
incoming: channels.NewInfiniteChannel(),
- outgoing: p.outgoing,
+ outgoing: channels.NewInfiniteChannel(),
}
return p, h
diff --git a/pkg/server/peer.go b/pkg/server/peer.go
index b4a7b23c..d7fb46c3 100644
--- a/pkg/server/peer.go
+++ b/pkg/server/peer.go
@@ -24,7 +24,6 @@ import (
"github.com/osrg/gobgp/internal/pkg/table"
"github.com/osrg/gobgp/pkg/packet/bgp"
- "github.com/eapache/channels"
log "github.com/sirupsen/logrus"
)
@@ -97,7 +96,6 @@ type peer struct {
tableId string
fsm *fsm
adjRibIn *table.AdjRib
- outgoing *channels.InfiniteChannel
policy *table.RoutingPolicy
localRib *table.TableManager
prefixLimitWarned map[bgp.RouteFamily]bool
@@ -106,7 +104,6 @@ type peer struct {
func newPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, policy *table.RoutingPolicy) *peer {
peer := &peer{
- outgoing: channels.NewInfiniteChannel(),
localRib: loc,
policy: policy,
fsm: newFSM(g, conf, policy),
@@ -528,8 +525,8 @@ func (peer *peer) handleUpdate(e *fsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg
return nil, nil, nil
}
-func (peer *peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *fsmMsg) {
- handler := newFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
+func (peer *peer) startFSMHandler() {
+ handler := newFSMHandler(peer.fsm, peer.fsm.outgoingCh)
peer.fsm.lock.Lock()
peer.fsm.h = handler
peer.fsm.lock.Unlock()
@@ -554,29 +551,3 @@ func (peer *peer) PassConn(conn *net.TCPConn) {
func (peer *peer) DropAll(rfList []bgp.RouteFamily) {
peer.adjRibIn.Drop(rfList)
}
-
-func (peer *peer) stopFSM() error {
- failed := false
- peer.fsm.lock.RLock()
- addr := peer.fsm.pConf.State.NeighborAddress
- state := peer.fsm.state
- peer.fsm.lock.RUnlock()
- t1 := time.AfterFunc(time.Minute*5, func() {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Warnf("Failed to free the fsm.h.t for %s", addr)
- failed = true
- })
- peer.fsm.h.ctxCancel()
- peer.fsm.h.wg.Wait()
- t1.Stop()
- if !failed {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": addr,
- "State": state,
- }).Debug("freed fsm.h.t")
- cleanInfiniteChannel(peer.outgoing)
- }
- return fmt.Errorf("failed to free FSM for %s", addr)
-}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 7e7c370f..cdc557b5 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
+ "reflect"
"strconv"
"sync"
"time"
@@ -117,11 +118,9 @@ func GrpcOption(opt []grpc.ServerOption) ServerOption {
}
type BgpServer struct {
- bgpConfig config.Bgp
- fsmincomingCh *channels.InfiniteChannel
- fsmStateCh chan *fsmMsg
- acceptCh chan *net.TCPConn
-
+ bgpConfig config.Bgp
+ acceptCh chan *net.TCPConn
+ incomings []*channels.InfiniteChannel
mgmtCh chan *mgmtOp
policy *table.RoutingPolicy
listeners []*tcpListener
@@ -169,6 +168,19 @@ func NewBgpServer(opt ...ServerOption) *BgpServer {
return s
}
+func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) {
+ s.incomings = append(s.incomings, ch)
+}
+
+func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) {
+ for i, c := range s.incomings {
+ if c == ch {
+ s.incomings = append(s.incomings[:i], s.incomings[i+1:]...)
+ return
+ }
+ }
+}
+
func (s *BgpServer) listListeners(addr string) []*net.TCPListener {
list := make([]*net.TCPListener, 0, len(s.listeners))
rhs := net.ParseIP(addr).To4() != nil
@@ -216,152 +228,173 @@ func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error)
return
}
-func (s *BgpServer) Serve() {
- s.listeners = make([]*tcpListener, 0, 2)
- s.fsmincomingCh = channels.NewInfiniteChannel()
- s.fsmStateCh = make(chan *fsmMsg, 4096)
-
- handlefsmMsg := func(e *fsmMsg) {
- peer, found := s.neighborMap[e.MsgSrc]
- if !found {
+func (s *BgpServer) passConnToPeer(conn *net.TCPConn) {
+ host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
+ ipaddr, _ := net.ResolveIPAddr("ip", host)
+ remoteAddr := ipaddr.String()
+ peer, found := s.neighborMap[remoteAddr]
+ if found {
+ peer.fsm.lock.RLock()
+ adminStateNotUp := peer.fsm.adminState != adminStateUp
+ peer.fsm.lock.RUnlock()
+ if adminStateNotUp {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Warnf("Can't find the neighbor %s", e.MsgSrc)
+ "Topic": "Peer",
+ "Remote Addr": remoteAddr,
+ "Admin State": peer.fsm.adminState,
+ }).Debug("New connection for non admin-state-up peer")
+ peer.fsm.lock.RUnlock()
+ conn.Close()
return
}
peer.fsm.lock.RLock()
- versionMismatch := e.Version != peer.fsm.version
+ localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
peer.fsm.lock.RUnlock()
- if versionMismatch {
+ localAddrValid := func(laddr string) bool {
+ if laddr == "0.0.0.0" || laddr == "::" {
+ return true
+ }
+ l := conn.LocalAddr()
+ if l == nil {
+ // already closed
+ return false
+ }
+
+ host, _, _ := net.SplitHostPort(l.String())
+ if host != laddr {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": remoteAddr,
+ "Configured addr": laddr,
+ "Addr": host,
+ }).Info("Mismatched local address")
+ return false
+ }
+ return true
+ }(localAddr)
+
+ if !localAddrValid {
+ conn.Close()
+ return
+ }
+
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Debugf("Accepted a new passive connection from:%s", remoteAddr)
+ peer.PassConn(conn)
+ } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr)
+ rib := s.globalRib
+ if pg.Conf.RouteServer.Config.RouteServerClient {
+ rib = s.rsRib
+ }
+ peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy)
+ if peer == nil {
log.WithFields(log.Fields{
"Topic": "Peer",
- }).Debug("FSM version inconsistent")
+ "Key": remoteAddr,
+ }).Infof("Can't create new Dynamic Peer")
+ conn.Close()
return
}
- s.handleFSMMessage(peer, e)
+ s.addIncoming(peer.fsm.incomingCh)
+ peer.fsm.lock.RLock()
+ policy := peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
+ s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
+ s.neighborMap[remoteAddr] = peer
+ peer.startFSMHandler()
+ s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
+ peer.PassConn(conn)
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr)
+ conn.Close()
}
+}
- for {
- passConn := func(conn *net.TCPConn) {
- host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
- ipaddr, _ := net.ResolveIPAddr("ip", host)
- remoteAddr := ipaddr.String()
- peer, found := s.neighborMap[remoteAddr]
- if found {
- peer.fsm.lock.RLock()
- adminStateNotUp := peer.fsm.adminState != adminStateUp
- peer.fsm.lock.RUnlock()
- if adminStateNotUp {
- peer.fsm.lock.RLock()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Remote Addr": remoteAddr,
- "Admin State": peer.fsm.adminState,
- }).Debug("New connection for non admin-state-up peer")
- peer.fsm.lock.RUnlock()
- conn.Close()
- return
- }
- peer.fsm.lock.RLock()
- localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
- peer.fsm.lock.RUnlock()
- localAddrValid := func(laddr string) bool {
- if laddr == "0.0.0.0" || laddr == "::" {
- return true
- }
- l := conn.LocalAddr()
- if l == nil {
- // already closed
- return false
- }
+const firstPeerCaseIndex = 3
- host, _, _ := net.SplitHostPort(l.String())
- if host != laddr {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": remoteAddr,
- "Configured addr": laddr,
- "Addr": host,
- }).Info("Mismatched local address")
- return false
- }
- return true
- }(localAddr)
+func (s *BgpServer) Serve() {
+ s.listeners = make([]*tcpListener, 0, 2)
- if !localAddrValid {
- conn.Close()
- return
- }
+ handlefsmMsg := func(e *fsmMsg) {
+ fsm := e.fsm
+ if fsm.h.ctx.Err() != nil {
+ // canceled
+ addr := fsm.pConf.State.NeighborAddress
+ state := fsm.state
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Debugf("Accepted a new passive connection from:%s", remoteAddr)
- peer.PassConn(conn)
- } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr)
- rib := s.globalRib
- if pg.Conf.RouteServer.Config.RouteServerClient {
- rib = s.rsRib
- }
- peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy)
- if peer == nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": remoteAddr,
- }).Infof("Can't create new Dynamic Peer")
- conn.Close()
- return
- }
- peer.fsm.lock.RLock()
- policy := peer.fsm.pConf.ApplyPolicy
- peer.fsm.lock.RUnlock()
- s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
- s.neighborMap[remoteAddr] = peer
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
- s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
- peer.PassConn(conn)
- } else {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr)
- conn.Close()
+ fsm.h.wg.Wait()
+
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ "State": state,
+ }).Debug("freed fsm.h")
+
+ cleanInfiniteChannel(fsm.outgoingCh)
+ cleanInfiniteChannel(fsm.incomingCh)
+ s.delIncoming(fsm.incomingCh)
+ if s.shutdownWG != nil && len(s.incomings) == 0 {
+ s.shutdownWG.Done()
}
+ return
}
- select {
- case op := <-s.mgmtCh:
- s.handleMGMTOp(op)
- case conn := <-s.acceptCh:
- passConn(conn)
- default:
+ peer, found := s.neighborMap[e.MsgSrc]
+ if !found {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Warnf("Can't find the neighbor %s", e.MsgSrc)
+ return
}
+ s.handleFSMMessage(peer, e)
+ }
- for {
- select {
- case e := <-s.fsmStateCh:
- handlefsmMsg(e)
- default:
- goto CONT
+ for {
+ cases := make([]reflect.SelectCase, firstPeerCaseIndex+len(s.incomings))
+ cases[0] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.mgmtCh),
+ }
+ cases[1] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.acceptCh),
+ }
+ cases[2] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.roaManager.ReceiveROA()),
+ }
+ for i := firstPeerCaseIndex; i < len(cases); i++ {
+ cases[i] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()),
}
}
- CONT:
- select {
- case op := <-s.mgmtCh:
+ chosen, value, ok := reflect.Select(cases)
+ switch chosen {
+ case 0:
+ op := value.Interface().(*mgmtOp)
s.handleMGMTOp(op)
- case rmsg := <-s.roaManager.ReceiveROA():
- s.roaManager.HandleROAEvent(rmsg)
- case conn := <-s.acceptCh:
- passConn(conn)
- case e, ok := <-s.fsmincomingCh.Out():
- if !ok {
- continue
+ case 1:
+ conn := value.Interface().(*net.TCPConn)
+ s.passConnToPeer(conn)
+ case 2:
+ ev := value.Interface().(*roaEvent)
+ s.roaManager.HandleROAEvent(ev)
+ default:
+ // in the case of dynamic peer, handleFSMMessage closed incoming channel so
+ // nil fsmMsg can happen here.
+ if ok {
+ e := value.Interface().(*fsmMsg)
+ handlefsmMsg(e)
}
- handlefsmMsg(e.(*fsmMsg))
- case e := <-s.fsmStateCh:
- handlefsmMsg(e)
}
}
}
@@ -385,7 +418,7 @@ func (s *BgpServer) matchLongestDynamicNeighborPrefix(a string) *peerGroup {
}
func sendfsmOutgoingMsg(peer *peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) {
- peer.outgoing.In() <- &fsmOutgoingMsg{
+ peer.fsm.outgoingCh.In() <- &fsmOutgoingMsg{
Paths: paths,
Notification: notification,
StayIdle: stayIdle,
@@ -1155,10 +1188,11 @@ func (s *BgpServer) propagateUpdateToNeighbors(source *peer, newPath *table.Path
func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e *fsmMsg) {
peer.stopPeerRestarting()
- go peer.stopFSM()
peer.fsm.lock.RLock()
delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress)
peer.fsm.lock.RUnlock()
+ cleanInfiniteChannel(peer.fsm.outgoingCh)
+ cleanInfiniteChannel(peer.fsm.incomingCh)
s.broadcastPeerState(peer, oldState, e)
}
@@ -1293,8 +1327,8 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
}
}
- cleanInfiniteChannel(peer.outgoing)
- peer.outgoing = channels.NewInfiniteChannel()
+ cleanInfiniteChannel(peer.fsm.outgoingCh)
+ peer.fsm.outgoingCh = channels.NewInfiniteChannel()
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
@@ -1393,21 +1427,6 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
}
}
} else {
- if s.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE {
- die := true
- for _, p := range s.neighborMap {
- p.fsm.lock.RLock()
- stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE
- p.fsm.lock.RUnlock()
- if stateNotIdle {
- die = false
- break
- }
- }
- if die {
- s.shutdownWG.Done()
- }
- }
peer.fsm.lock.Lock()
peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix()
peer.fsm.lock.Unlock()
@@ -1424,7 +1443,7 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
peer.fsm.pConf.Timers.State = config.TimersState{}
peer.fsm.lock.Unlock()
}
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
+ peer.startFSMHandler()
s.broadcastPeerState(peer, oldState, e)
case fsmMsgRouteRefresh:
peer.fsm.lock.RLock()
@@ -1624,12 +1643,18 @@ func (s *BgpServer) DeleteBmp(ctx context.Context, r *api.DeleteBmpRequest) erro
func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error {
s.mgmtOperation(func() error {
- s.shutdownWG = new(sync.WaitGroup)
- s.shutdownWG.Add(1)
-
+ names := make([]string, 0, len(s.neighborMap))
for k := range s.neighborMap {
+ names = append(names, k)
+ }
+
+ if len(names) != 0 {
+ s.shutdownWG = new(sync.WaitGroup)
+ s.shutdownWG.Add(1)
+ }
+ for _, name := range names {
if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{
- NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil {
+ NeighborAddress: name}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil {
return err
}
}
@@ -1640,13 +1665,9 @@ func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error {
return nil
}, false)
- // Waits for all goroutines per peer to stop.
- // Note: This should not be wrapped with s.mgmtOperation() in order to
- // avoid the deadlock in the main goroutine of BgpServer.
- // if s.shutdownWG != nil {
- // s.shutdownWG.Wait()
- // s.shutdownWG = nil
- // }
+ if s.shutdownWG != nil {
+ s.shutdownWG.Wait()
+ }
return nil
}
@@ -2636,12 +2657,13 @@ func (s *BgpServer) addNeighbor(c *config.Neighbor) error {
rib = s.rsRib
}
peer := newPeer(&s.bgpConfig.Global, c, rib, s.policy)
+ s.addIncoming(peer.fsm.incomingCh)
s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): c.ApplyPolicy})
s.neighborMap[addr] = peer
if name := c.Config.PeerGroup; name != "" {
s.peerGroupMap[name].AddMember(*c)
}
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
+ peer.startFSMHandler()
s.broadcastPeerState(peer, bgp.BGP_FSM_IDLE, nil)
return nil
}
@@ -2727,10 +2749,10 @@ func (s *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) erro
"Topic": "Peer",
}).Infof("Delete a peer configuration for:%s", addr)
- n.fsm.sendNotification(code, subcode, nil, "")
n.stopPeerRestarting()
+ n.fsm.notification <- bgp.NewBGPNotificationMessage(code, subcode, nil)
+ n.fsm.h.ctxCancel()
- go n.stopFSM()
delete(s.neighborMap, addr)
s.dropPeerAllRoutes(n, n.configuredRFlist())
return nil
diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go
index 54a2dd00..f325f3ad 100644
--- a/pkg/server/server_test.go
+++ b/pkg/server/server_test.go
@@ -36,6 +36,44 @@ import (
"github.com/osrg/gobgp/pkg/packet/bgp"
)
+func TestStop(t *testing.T) {
+ assert := assert.New(t)
+ s := NewBgpServer()
+ go s.Serve()
+ err := s.StartBgp(context.Background(), &api.StartBgpRequest{
+ Global: &api.Global{
+ As: 1,
+ RouterId: "1.1.1.1",
+ ListenPort: -1,
+ },
+ })
+ assert.Nil(err)
+ s.StopBgp(context.Background(), &api.StopBgpRequest{})
+
+ s = NewBgpServer()
+ go s.Serve()
+ err = s.StartBgp(context.Background(), &api.StartBgpRequest{
+ Global: &api.Global{
+ As: 1,
+ RouterId: "1.1.1.1",
+ ListenPort: -1,
+ },
+ })
+ assert.Nil(err)
+ p := &api.Peer{
+ Conf: &api.PeerConf{
+ NeighborAddress: "2.2.2.2",
+ PeerAs: 1,
+ },
+ RouteServer: &api.RouteServer{
+ RouteServerClient: true,
+ },
+ }
+ err = s.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p})
+ assert.Nil(err)
+ s.StopBgp(context.Background(), &api.StopBgpRequest{})
+}
+
func TestModPolicyAssign(t *testing.T) {
assert := assert.New(t)
s := NewBgpServer()
@@ -184,18 +222,16 @@ func TestMonitor(test *testing.T) {
assert.Nil(err)
defer s.StopBgp(context.Background(), &api.StopBgpRequest{})
- n := &config.Neighbor{
- Config: config.NeighborConfig{
+ p1 := &api.Peer{
+ Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAs: 2,
},
- Transport: config.Transport{
- Config: config.TransportConfig{
- PassiveMode: true,
- },
+ Transport: &api.Transport{
+ PassiveMode: true,
},
}
- err = s.addNeighbor(n)
+ err = s.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1})
assert.Nil(err)
t := NewBgpServer()
@@ -210,15 +246,13 @@ func TestMonitor(test *testing.T) {
assert.Nil(err)
defer t.StopBgp(context.Background(), &api.StopBgpRequest{})
- m := &config.Neighbor{
- Config: config.NeighborConfig{
+ p2 := &api.Peer{
+ Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAs: 1,
},
- Transport: config.Transport{
- Config: config.TransportConfig{
- RemotePort: 10179,
- },
+ Transport: &api.Transport{
+ RemotePort: 10179,
},
}
ch := make(chan struct{})
@@ -228,7 +262,7 @@ func TestMonitor(test *testing.T) {
}
})
- err = t.AddPeer(context.Background(), &api.AddPeerRequest{Peer: config.NewPeerFromConfigStruct(m)})
+ err = t.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p2})
assert.Nil(err)
<-ch
@@ -689,24 +723,20 @@ func TestGracefulRestartTimerExpired(t *testing.T) {
assert.Nil(err)
defer s1.StopBgp(context.Background(), &api.StopBgpRequest{})
- n := &config.Neighbor{
- Config: config.NeighborConfig{
+ p1 := &api.Peer{
+ Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAs: 2,
},
- Transport: config.Transport{
- Config: config.TransportConfig{
- PassiveMode: true,
- },
+ Transport: &api.Transport{
+ PassiveMode: true,
},
- GracefulRestart: config.GracefulRestart{
- Config: config.GracefulRestartConfig{
- Enabled: true,
- RestartTime: minConnectRetryInterval,
- },
+ GracefulRestart: &api.GracefulRestart{
+ Enabled: true,
+ RestartTime: minConnectRetryInterval,
},
}
- err = s1.addNeighbor(n)
+ err = s1.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1})
assert.Nil(err)
s2 := NewBgpServer()
@@ -721,30 +751,27 @@ func TestGracefulRestartTimerExpired(t *testing.T) {
require.NoError(t, err)
defer s2.StopBgp(context.Background(), &api.StopBgpRequest{})
- m := &config.Neighbor{
- Config: config.NeighborConfig{
+ p2 := &api.Peer{
+ Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAs: 1,
},
- Transport: config.Transport{
- Config: config.TransportConfig{
- RemotePort: 10179,
- },
+ Transport: &api.Transport{
+ RemotePort: 10179,
},
- GracefulRestart: config.GracefulRestart{
- Config: config.GracefulRestartConfig{
- Enabled: true,
- RestartTime: 1,
- },
+ GracefulRestart: &api.GracefulRestart{
+ Enabled: true,
+ RestartTime: 1,
},
}
+
ch := make(chan struct{})
go s2.MonitorPeer(context.Background(), &api.MonitorPeerRequest{}, func(peer *api.Peer) {
if peer.State.SessionState == api.PeerState_ESTABLISHED {
close(ch)
}
})
- err = s2.addNeighbor(m)
+ err = s2.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p2})
assert.Nil(err)
<-ch