summaryrefslogtreecommitdiffhomepage
path: root/pkg/server/server.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@gmail.com>2019-03-23 20:48:18 +0900
committerFUJITA Tomonori <fujita.tomonori@gmail.com>2019-03-24 20:34:21 +0900
commitc6aef93ef141b7eba7d682bf3e52cc82b41b926b (patch)
tree4a32f87754e7acea27862a593eade2d9d6ddc201 /pkg/server/server.go
parentddf9e5572f144ebac031e4128c8e3432470c6ef7 (diff)
fix race of peer deletion
Fixed a race bug that causes the unittest failure. Also fixed StopBgp() to block until all the peers are deleted cleanly. Signed-off-by: FUJITA Tomonori <fujita.tomonori@gmail.com>
Diffstat (limited to 'pkg/server/server.go')
-rw-r--r--pkg/server/server.go344
1 files changed, 183 insertions, 161 deletions
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 7e7c370f..cdc557b5 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
+ "reflect"
"strconv"
"sync"
"time"
@@ -117,11 +118,9 @@ func GrpcOption(opt []grpc.ServerOption) ServerOption {
}
type BgpServer struct {
- bgpConfig config.Bgp
- fsmincomingCh *channels.InfiniteChannel
- fsmStateCh chan *fsmMsg
- acceptCh chan *net.TCPConn
-
+ bgpConfig config.Bgp
+ acceptCh chan *net.TCPConn
+ incomings []*channels.InfiniteChannel
mgmtCh chan *mgmtOp
policy *table.RoutingPolicy
listeners []*tcpListener
@@ -169,6 +168,19 @@ func NewBgpServer(opt ...ServerOption) *BgpServer {
return s
}
+func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) {
+ s.incomings = append(s.incomings, ch)
+}
+
+func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) {
+ for i, c := range s.incomings {
+ if c == ch {
+ s.incomings = append(s.incomings[:i], s.incomings[i+1:]...)
+ return
+ }
+ }
+}
+
func (s *BgpServer) listListeners(addr string) []*net.TCPListener {
list := make([]*net.TCPListener, 0, len(s.listeners))
rhs := net.ParseIP(addr).To4() != nil
@@ -216,152 +228,173 @@ func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error)
return
}
-func (s *BgpServer) Serve() {
- s.listeners = make([]*tcpListener, 0, 2)
- s.fsmincomingCh = channels.NewInfiniteChannel()
- s.fsmStateCh = make(chan *fsmMsg, 4096)
-
- handlefsmMsg := func(e *fsmMsg) {
- peer, found := s.neighborMap[e.MsgSrc]
- if !found {
+func (s *BgpServer) passConnToPeer(conn *net.TCPConn) {
+ host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
+ ipaddr, _ := net.ResolveIPAddr("ip", host)
+ remoteAddr := ipaddr.String()
+ peer, found := s.neighborMap[remoteAddr]
+ if found {
+ peer.fsm.lock.RLock()
+ adminStateNotUp := peer.fsm.adminState != adminStateUp
+ peer.fsm.lock.RUnlock()
+ if adminStateNotUp {
+ peer.fsm.lock.RLock()
log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Warnf("Can't find the neighbor %s", e.MsgSrc)
+ "Topic": "Peer",
+ "Remote Addr": remoteAddr,
+ "Admin State": peer.fsm.adminState,
+ }).Debug("New connection for non admin-state-up peer")
+ peer.fsm.lock.RUnlock()
+ conn.Close()
return
}
peer.fsm.lock.RLock()
- versionMismatch := e.Version != peer.fsm.version
+ localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
peer.fsm.lock.RUnlock()
- if versionMismatch {
+ localAddrValid := func(laddr string) bool {
+ if laddr == "0.0.0.0" || laddr == "::" {
+ return true
+ }
+ l := conn.LocalAddr()
+ if l == nil {
+ // already closed
+ return false
+ }
+
+ host, _, _ := net.SplitHostPort(l.String())
+ if host != laddr {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": remoteAddr,
+ "Configured addr": laddr,
+ "Addr": host,
+ }).Info("Mismatched local address")
+ return false
+ }
+ return true
+ }(localAddr)
+
+ if !localAddrValid {
+ conn.Close()
+ return
+ }
+
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Debugf("Accepted a new passive connection from:%s", remoteAddr)
+ peer.PassConn(conn)
+ } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr)
+ rib := s.globalRib
+ if pg.Conf.RouteServer.Config.RouteServerClient {
+ rib = s.rsRib
+ }
+ peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy)
+ if peer == nil {
log.WithFields(log.Fields{
"Topic": "Peer",
- }).Debug("FSM version inconsistent")
+ "Key": remoteAddr,
+ }).Infof("Can't create new Dynamic Peer")
+ conn.Close()
return
}
- s.handleFSMMessage(peer, e)
+ s.addIncoming(peer.fsm.incomingCh)
+ peer.fsm.lock.RLock()
+ policy := peer.fsm.pConf.ApplyPolicy
+ peer.fsm.lock.RUnlock()
+ s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
+ s.neighborMap[remoteAddr] = peer
+ peer.startFSMHandler()
+ s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
+ peer.PassConn(conn)
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr)
+ conn.Close()
}
+}
- for {
- passConn := func(conn *net.TCPConn) {
- host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
- ipaddr, _ := net.ResolveIPAddr("ip", host)
- remoteAddr := ipaddr.String()
- peer, found := s.neighborMap[remoteAddr]
- if found {
- peer.fsm.lock.RLock()
- adminStateNotUp := peer.fsm.adminState != adminStateUp
- peer.fsm.lock.RUnlock()
- if adminStateNotUp {
- peer.fsm.lock.RLock()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Remote Addr": remoteAddr,
- "Admin State": peer.fsm.adminState,
- }).Debug("New connection for non admin-state-up peer")
- peer.fsm.lock.RUnlock()
- conn.Close()
- return
- }
- peer.fsm.lock.RLock()
- localAddr := peer.fsm.pConf.Transport.Config.LocalAddress
- peer.fsm.lock.RUnlock()
- localAddrValid := func(laddr string) bool {
- if laddr == "0.0.0.0" || laddr == "::" {
- return true
- }
- l := conn.LocalAddr()
- if l == nil {
- // already closed
- return false
- }
+const firstPeerCaseIndex = 3
- host, _, _ := net.SplitHostPort(l.String())
- if host != laddr {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": remoteAddr,
- "Configured addr": laddr,
- "Addr": host,
- }).Info("Mismatched local address")
- return false
- }
- return true
- }(localAddr)
+func (s *BgpServer) Serve() {
+ s.listeners = make([]*tcpListener, 0, 2)
- if !localAddrValid {
- conn.Close()
- return
- }
+ handlefsmMsg := func(e *fsmMsg) {
+ fsm := e.fsm
+ if fsm.h.ctx.Err() != nil {
+ // canceled
+ addr := fsm.pConf.State.NeighborAddress
+ state := fsm.state
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Debugf("Accepted a new passive connection from:%s", remoteAddr)
- peer.PassConn(conn)
- } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr)
- rib := s.globalRib
- if pg.Conf.RouteServer.Config.RouteServerClient {
- rib = s.rsRib
- }
- peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy)
- if peer == nil {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": remoteAddr,
- }).Infof("Can't create new Dynamic Peer")
- conn.Close()
- return
- }
- peer.fsm.lock.RLock()
- policy := peer.fsm.pConf.ApplyPolicy
- peer.fsm.lock.RUnlock()
- s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy})
- s.neighborMap[remoteAddr] = peer
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
- s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil)
- peer.PassConn(conn)
- } else {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr)
- conn.Close()
+ fsm.h.wg.Wait()
+
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": addr,
+ "State": state,
+ }).Debug("freed fsm.h")
+
+ cleanInfiniteChannel(fsm.outgoingCh)
+ cleanInfiniteChannel(fsm.incomingCh)
+ s.delIncoming(fsm.incomingCh)
+ if s.shutdownWG != nil && len(s.incomings) == 0 {
+ s.shutdownWG.Done()
}
+ return
}
- select {
- case op := <-s.mgmtCh:
- s.handleMGMTOp(op)
- case conn := <-s.acceptCh:
- passConn(conn)
- default:
+ peer, found := s.neighborMap[e.MsgSrc]
+ if !found {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ }).Warnf("Can't find the neighbor %s", e.MsgSrc)
+ return
}
+ s.handleFSMMessage(peer, e)
+ }
- for {
- select {
- case e := <-s.fsmStateCh:
- handlefsmMsg(e)
- default:
- goto CONT
+ for {
+ cases := make([]reflect.SelectCase, firstPeerCaseIndex+len(s.incomings))
+ cases[0] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.mgmtCh),
+ }
+ cases[1] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.acceptCh),
+ }
+ cases[2] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.roaManager.ReceiveROA()),
+ }
+ for i := firstPeerCaseIndex; i < len(cases); i++ {
+ cases[i] = reflect.SelectCase{
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()),
}
}
- CONT:
- select {
- case op := <-s.mgmtCh:
+ chosen, value, ok := reflect.Select(cases)
+ switch chosen {
+ case 0:
+ op := value.Interface().(*mgmtOp)
s.handleMGMTOp(op)
- case rmsg := <-s.roaManager.ReceiveROA():
- s.roaManager.HandleROAEvent(rmsg)
- case conn := <-s.acceptCh:
- passConn(conn)
- case e, ok := <-s.fsmincomingCh.Out():
- if !ok {
- continue
+ case 1:
+ conn := value.Interface().(*net.TCPConn)
+ s.passConnToPeer(conn)
+ case 2:
+ ev := value.Interface().(*roaEvent)
+ s.roaManager.HandleROAEvent(ev)
+ default:
+ // in the case of dynamic peer, handleFSMMessage closed incoming channel so
+ // nil fsmMsg can happen here.
+ if ok {
+ e := value.Interface().(*fsmMsg)
+ handlefsmMsg(e)
}
- handlefsmMsg(e.(*fsmMsg))
- case e := <-s.fsmStateCh:
- handlefsmMsg(e)
}
}
}
@@ -385,7 +418,7 @@ func (s *BgpServer) matchLongestDynamicNeighborPrefix(a string) *peerGroup {
}
func sendfsmOutgoingMsg(peer *peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) {
- peer.outgoing.In() <- &fsmOutgoingMsg{
+ peer.fsm.outgoingCh.In() <- &fsmOutgoingMsg{
Paths: paths,
Notification: notification,
StayIdle: stayIdle,
@@ -1155,10 +1188,11 @@ func (s *BgpServer) propagateUpdateToNeighbors(source *peer, newPath *table.Path
func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e *fsmMsg) {
peer.stopPeerRestarting()
- go peer.stopFSM()
peer.fsm.lock.RLock()
delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress)
peer.fsm.lock.RUnlock()
+ cleanInfiniteChannel(peer.fsm.outgoingCh)
+ cleanInfiniteChannel(peer.fsm.incomingCh)
s.broadcastPeerState(peer, oldState, e)
}
@@ -1293,8 +1327,8 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
}
}
- cleanInfiniteChannel(peer.outgoing)
- peer.outgoing = channels.NewInfiniteChannel()
+ cleanInfiniteChannel(peer.fsm.outgoingCh)
+ peer.fsm.outgoingCh = channels.NewInfiniteChannel()
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
@@ -1393,21 +1427,6 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
}
}
} else {
- if s.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE {
- die := true
- for _, p := range s.neighborMap {
- p.fsm.lock.RLock()
- stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE
- p.fsm.lock.RUnlock()
- if stateNotIdle {
- die = false
- break
- }
- }
- if die {
- s.shutdownWG.Done()
- }
- }
peer.fsm.lock.Lock()
peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix()
peer.fsm.lock.Unlock()
@@ -1424,7 +1443,7 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) {
peer.fsm.pConf.Timers.State = config.TimersState{}
peer.fsm.lock.Unlock()
}
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
+ peer.startFSMHandler()
s.broadcastPeerState(peer, oldState, e)
case fsmMsgRouteRefresh:
peer.fsm.lock.RLock()
@@ -1624,12 +1643,18 @@ func (s *BgpServer) DeleteBmp(ctx context.Context, r *api.DeleteBmpRequest) erro
func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error {
s.mgmtOperation(func() error {
- s.shutdownWG = new(sync.WaitGroup)
- s.shutdownWG.Add(1)
-
+ names := make([]string, 0, len(s.neighborMap))
for k := range s.neighborMap {
+ names = append(names, k)
+ }
+
+ if len(names) != 0 {
+ s.shutdownWG = new(sync.WaitGroup)
+ s.shutdownWG.Add(1)
+ }
+ for _, name := range names {
if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{
- NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil {
+ NeighborAddress: name}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil {
return err
}
}
@@ -1640,13 +1665,9 @@ func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error {
return nil
}, false)
- // Waits for all goroutines per peer to stop.
- // Note: This should not be wrapped with s.mgmtOperation() in order to
- // avoid the deadlock in the main goroutine of BgpServer.
- // if s.shutdownWG != nil {
- // s.shutdownWG.Wait()
- // s.shutdownWG = nil
- // }
+ if s.shutdownWG != nil {
+ s.shutdownWG.Wait()
+ }
return nil
}
@@ -2636,12 +2657,13 @@ func (s *BgpServer) addNeighbor(c *config.Neighbor) error {
rib = s.rsRib
}
peer := newPeer(&s.bgpConfig.Global, c, rib, s.policy)
+ s.addIncoming(peer.fsm.incomingCh)
s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): c.ApplyPolicy})
s.neighborMap[addr] = peer
if name := c.Config.PeerGroup; name != "" {
s.peerGroupMap[name].AddMember(*c)
}
- peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh)
+ peer.startFSMHandler()
s.broadcastPeerState(peer, bgp.BGP_FSM_IDLE, nil)
return nil
}
@@ -2727,10 +2749,10 @@ func (s *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) erro
"Topic": "Peer",
}).Infof("Delete a peer configuration for:%s", addr)
- n.fsm.sendNotification(code, subcode, nil, "")
n.stopPeerRestarting()
+ n.fsm.notification <- bgp.NewBGPNotificationMessage(code, subcode, nil)
+ n.fsm.h.ctxCancel()
- go n.stopFSM()
delete(s.neighborMap, addr)
s.dropPeerAllRoutes(n, n.configuredRFlist())
return nil