diff options
author | FUJITA Tomonori <fujita.tomonori@gmail.com> | 2019-03-23 20:48:18 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@gmail.com> | 2019-03-24 20:34:21 +0900 |
commit | c6aef93ef141b7eba7d682bf3e52cc82b41b926b (patch) | |
tree | 4a32f87754e7acea27862a593eade2d9d6ddc201 /pkg/server/server.go | |
parent | ddf9e5572f144ebac031e4128c8e3432470c6ef7 (diff) |
fix race of peer deletion
Fixed a race bug that causes the unittest failure. Also fixed
StopBgp() to block until all the peers are deleted cleanly.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@gmail.com>
Diffstat (limited to 'pkg/server/server.go')
-rw-r--r-- | pkg/server/server.go | 344 |
1 files changed, 183 insertions, 161 deletions
diff --git a/pkg/server/server.go b/pkg/server/server.go index 7e7c370f..cdc557b5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "reflect" "strconv" "sync" "time" @@ -117,11 +118,9 @@ func GrpcOption(opt []grpc.ServerOption) ServerOption { } type BgpServer struct { - bgpConfig config.Bgp - fsmincomingCh *channels.InfiniteChannel - fsmStateCh chan *fsmMsg - acceptCh chan *net.TCPConn - + bgpConfig config.Bgp + acceptCh chan *net.TCPConn + incomings []*channels.InfiniteChannel mgmtCh chan *mgmtOp policy *table.RoutingPolicy listeners []*tcpListener @@ -169,6 +168,19 @@ func NewBgpServer(opt ...ServerOption) *BgpServer { return s } +func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) { + s.incomings = append(s.incomings, ch) +} + +func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) { + for i, c := range s.incomings { + if c == ch { + s.incomings = append(s.incomings[:i], s.incomings[i+1:]...) + return + } + } +} + func (s *BgpServer) listListeners(addr string) []*net.TCPListener { list := make([]*net.TCPListener, 0, len(s.listeners)) rhs := net.ParseIP(addr).To4() != nil @@ -216,152 +228,173 @@ func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error) return } -func (s *BgpServer) Serve() { - s.listeners = make([]*tcpListener, 0, 2) - s.fsmincomingCh = channels.NewInfiniteChannel() - s.fsmStateCh = make(chan *fsmMsg, 4096) - - handlefsmMsg := func(e *fsmMsg) { - peer, found := s.neighborMap[e.MsgSrc] - if !found { +func (s *BgpServer) passConnToPeer(conn *net.TCPConn) { + host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + ipaddr, _ := net.ResolveIPAddr("ip", host) + remoteAddr := ipaddr.String() + peer, found := s.neighborMap[remoteAddr] + if found { + peer.fsm.lock.RLock() + adminStateNotUp := peer.fsm.adminState != adminStateUp + peer.fsm.lock.RUnlock() + if adminStateNotUp { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ - "Topic": "Peer", - }).Warnf("Can't find the neighbor %s", e.MsgSrc) + "Topic": "Peer", + "Remote Addr": remoteAddr, + "Admin State": peer.fsm.adminState, + }).Debug("New connection for non admin-state-up peer") + peer.fsm.lock.RUnlock() + conn.Close() return } peer.fsm.lock.RLock() - versionMismatch := e.Version != peer.fsm.version + localAddr := peer.fsm.pConf.Transport.Config.LocalAddress peer.fsm.lock.RUnlock() - if versionMismatch { + localAddrValid := func(laddr string) bool { + if laddr == "0.0.0.0" || laddr == "::" { + return true + } + l := conn.LocalAddr() + if l == nil { + // already closed + return false + } + + host, _, _ := net.SplitHostPort(l.String()) + if host != laddr { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": remoteAddr, + "Configured addr": laddr, + "Addr": host, + }).Info("Mismatched local address") + return false + } + return true + }(localAddr) + + if !localAddrValid { + conn.Close() + return + } + + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Debugf("Accepted a new passive connection from:%s", remoteAddr) + peer.PassConn(conn) + } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr) + rib := s.globalRib + if pg.Conf.RouteServer.Config.RouteServerClient { + rib = s.rsRib + } + peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy) + if peer == nil { log.WithFields(log.Fields{ "Topic": "Peer", - }).Debug("FSM version inconsistent") + "Key": remoteAddr, + }).Infof("Can't create new Dynamic Peer") + conn.Close() return } - s.handleFSMMessage(peer, e) + s.addIncoming(peer.fsm.incomingCh) + peer.fsm.lock.RLock() + policy := peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() + s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) + s.neighborMap[remoteAddr] = peer + peer.startFSMHandler() + s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) + peer.PassConn(conn) + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr) + conn.Close() } +} - for { - passConn := func(conn *net.TCPConn) { - host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - ipaddr, _ := net.ResolveIPAddr("ip", host) - remoteAddr := ipaddr.String() - peer, found := s.neighborMap[remoteAddr] - if found { - peer.fsm.lock.RLock() - adminStateNotUp := peer.fsm.adminState != adminStateUp - peer.fsm.lock.RUnlock() - if adminStateNotUp { - peer.fsm.lock.RLock() - log.WithFields(log.Fields{ - "Topic": "Peer", - "Remote Addr": remoteAddr, - "Admin State": peer.fsm.adminState, - }).Debug("New connection for non admin-state-up peer") - peer.fsm.lock.RUnlock() - conn.Close() - return - } - peer.fsm.lock.RLock() - localAddr := peer.fsm.pConf.Transport.Config.LocalAddress - peer.fsm.lock.RUnlock() - localAddrValid := func(laddr string) bool { - if laddr == "0.0.0.0" || laddr == "::" { - return true - } - l := conn.LocalAddr() - if l == nil { - // already closed - return false - } +const firstPeerCaseIndex = 3 - host, _, _ := net.SplitHostPort(l.String()) - if host != laddr { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": remoteAddr, - "Configured addr": laddr, - "Addr": host, - }).Info("Mismatched local address") - return false - } - return true - }(localAddr) +func (s *BgpServer) Serve() { + s.listeners = make([]*tcpListener, 0, 2) - if !localAddrValid { - conn.Close() - return - } + handlefsmMsg := func(e *fsmMsg) { + fsm := e.fsm + if fsm.h.ctx.Err() != nil { + // canceled + addr := fsm.pConf.State.NeighborAddress + state := fsm.state - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Debugf("Accepted a new passive connection from:%s", remoteAddr) - peer.PassConn(conn) - } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr) - rib := s.globalRib - if pg.Conf.RouteServer.Config.RouteServerClient { - rib = s.rsRib - } - peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy) - if peer == nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": remoteAddr, - }).Infof("Can't create new Dynamic Peer") - conn.Close() - return - } - peer.fsm.lock.RLock() - policy := peer.fsm.pConf.ApplyPolicy - peer.fsm.lock.RUnlock() - s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) - s.neighborMap[remoteAddr] = peer - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) - s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) - peer.PassConn(conn) - } else { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr) - conn.Close() + fsm.h.wg.Wait() + + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + "State": state, + }).Debug("freed fsm.h") + + cleanInfiniteChannel(fsm.outgoingCh) + cleanInfiniteChannel(fsm.incomingCh) + s.delIncoming(fsm.incomingCh) + if s.shutdownWG != nil && len(s.incomings) == 0 { + s.shutdownWG.Done() } + return } - select { - case op := <-s.mgmtCh: - s.handleMGMTOp(op) - case conn := <-s.acceptCh: - passConn(conn) - default: + peer, found := s.neighborMap[e.MsgSrc] + if !found { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Warnf("Can't find the neighbor %s", e.MsgSrc) + return } + s.handleFSMMessage(peer, e) + } - for { - select { - case e := <-s.fsmStateCh: - handlefsmMsg(e) - default: - goto CONT + for { + cases := make([]reflect.SelectCase, firstPeerCaseIndex+len(s.incomings)) + cases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.mgmtCh), + } + cases[1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.acceptCh), + } + cases[2] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.roaManager.ReceiveROA()), + } + for i := firstPeerCaseIndex; i < len(cases); i++ { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()), } } - CONT: - select { - case op := <-s.mgmtCh: + chosen, value, ok := reflect.Select(cases) + switch chosen { + case 0: + op := value.Interface().(*mgmtOp) s.handleMGMTOp(op) - case rmsg := <-s.roaManager.ReceiveROA(): - s.roaManager.HandleROAEvent(rmsg) - case conn := <-s.acceptCh: - passConn(conn) - case e, ok := <-s.fsmincomingCh.Out(): - if !ok { - continue + case 1: + conn := value.Interface().(*net.TCPConn) + s.passConnToPeer(conn) + case 2: + ev := value.Interface().(*roaEvent) + s.roaManager.HandleROAEvent(ev) + default: + // in the case of dynamic peer, handleFSMMessage closed incoming channel so + // nil fsmMsg can happen here. + if ok { + e := value.Interface().(*fsmMsg) + handlefsmMsg(e) } - handlefsmMsg(e.(*fsmMsg)) - case e := <-s.fsmStateCh: - handlefsmMsg(e) } } } @@ -385,7 +418,7 @@ func (s *BgpServer) matchLongestDynamicNeighborPrefix(a string) *peerGroup { } func sendfsmOutgoingMsg(peer *peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) { - peer.outgoing.In() <- &fsmOutgoingMsg{ + peer.fsm.outgoingCh.In() <- &fsmOutgoingMsg{ Paths: paths, Notification: notification, StayIdle: stayIdle, @@ -1155,10 +1188,11 @@ func (s *BgpServer) propagateUpdateToNeighbors(source *peer, newPath *table.Path func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e *fsmMsg) { peer.stopPeerRestarting() - go peer.stopFSM() peer.fsm.lock.RLock() delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress) peer.fsm.lock.RUnlock() + cleanInfiniteChannel(peer.fsm.outgoingCh) + cleanInfiniteChannel(peer.fsm.incomingCh) s.broadcastPeerState(peer, oldState, e) } @@ -1293,8 +1327,8 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { } } - cleanInfiniteChannel(peer.outgoing) - peer.outgoing = channels.NewInfiniteChannel() + cleanInfiniteChannel(peer.fsm.outgoingCh) + peer.fsm.outgoingCh = channels.NewInfiniteChannel() if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy laddr, _ := peer.fsm.LocalHostPort() @@ -1393,21 +1427,6 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { } } } else { - if s.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE { - die := true - for _, p := range s.neighborMap { - p.fsm.lock.RLock() - stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE - p.fsm.lock.RUnlock() - if stateNotIdle { - die = false - break - } - } - if die { - s.shutdownWG.Done() - } - } peer.fsm.lock.Lock() peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix() peer.fsm.lock.Unlock() @@ -1424,7 +1443,7 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { peer.fsm.pConf.Timers.State = config.TimersState{} peer.fsm.lock.Unlock() } - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) + peer.startFSMHandler() s.broadcastPeerState(peer, oldState, e) case fsmMsgRouteRefresh: peer.fsm.lock.RLock() @@ -1624,12 +1643,18 @@ func (s *BgpServer) DeleteBmp(ctx context.Context, r *api.DeleteBmpRequest) erro func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error { s.mgmtOperation(func() error { - s.shutdownWG = new(sync.WaitGroup) - s.shutdownWG.Add(1) - + names := make([]string, 0, len(s.neighborMap)) for k := range s.neighborMap { + names = append(names, k) + } + + if len(names) != 0 { + s.shutdownWG = new(sync.WaitGroup) + s.shutdownWG.Add(1) + } + for _, name := range names { if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{ - NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil { + NeighborAddress: name}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil { return err } } @@ -1640,13 +1665,9 @@ func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error { return nil }, false) - // Waits for all goroutines per peer to stop. - // Note: This should not be wrapped with s.mgmtOperation() in order to - // avoid the deadlock in the main goroutine of BgpServer. - // if s.shutdownWG != nil { - // s.shutdownWG.Wait() - // s.shutdownWG = nil - // } + if s.shutdownWG != nil { + s.shutdownWG.Wait() + } return nil } @@ -2636,12 +2657,13 @@ func (s *BgpServer) addNeighbor(c *config.Neighbor) error { rib = s.rsRib } peer := newPeer(&s.bgpConfig.Global, c, rib, s.policy) + s.addIncoming(peer.fsm.incomingCh) s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): c.ApplyPolicy}) s.neighborMap[addr] = peer if name := c.Config.PeerGroup; name != "" { s.peerGroupMap[name].AddMember(*c) } - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) + peer.startFSMHandler() s.broadcastPeerState(peer, bgp.BGP_FSM_IDLE, nil) return nil } @@ -2727,10 +2749,10 @@ func (s *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) erro "Topic": "Peer", }).Infof("Delete a peer configuration for:%s", addr) - n.fsm.sendNotification(code, subcode, nil, "") n.stopPeerRestarting() + n.fsm.notification <- bgp.NewBGPNotificationMessage(code, subcode, nil) + n.fsm.h.ctxCancel() - go n.stopFSM() delete(s.neighborMap, addr) s.dropPeerAllRoutes(n, n.configuredRFlist()) return nil |