diff options
-rw-r--r-- | pkg/server/fsm.go | 49 | ||||
-rw-r--r-- | pkg/server/fsm_test.go | 5 | ||||
-rw-r--r-- | pkg/server/peer.go | 33 | ||||
-rw-r--r-- | pkg/server/server.go | 344 | ||||
-rw-r--r-- | pkg/server/server_test.go | 103 |
5 files changed, 278 insertions, 256 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index 6eadfa92..e66054c7 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -123,13 +123,13 @@ const ( type fsmMsg struct { MsgType fsmMsgType + fsm *fsm MsgSrc string MsgData interface{} StateReason *fsmStateReason PathList []*table.Path timestamp time.Time payload []byte - Version uint } type fsmOutgoingMsg struct { @@ -169,13 +169,13 @@ type adminStateOperation struct { Communication []byte } -var fsmVersion uint - type fsm struct { gConf *config.Global pConf *config.Neighbor lock sync.RWMutex state bgp.FSMState + outgoingCh *channels.InfiniteChannel + incomingCh *channels.InfiniteChannel reason *fsmStateReason conn net.Conn connCh chan net.Conn @@ -191,8 +191,8 @@ type fsm struct { policy *table.RoutingPolicy gracefulRestartTimer *time.Timer twoByteAsTrans bool - version uint marshallingOptions *bgp.MarshallingOption + notification chan *bgp.BGPMessage } func (fsm *fsm) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -267,11 +267,12 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP } pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)] pConf.Timers.State.Downtime = time.Now().Unix() - fsmVersion++ fsm := &fsm{ gConf: gConf, pConf: pConf, state: bgp.BGP_FSM_IDLE, + outgoingCh: channels.NewInfiniteChannel(), + incomingCh: channels.NewInfiniteChannel(), connCh: make(chan net.Conn, 1), opensentHoldTime: float64(holdtimeOpensent), adminState: adminState, @@ -281,7 +282,7 @@ func newFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingP peerInfo: table.NewPeerInfo(gConf, pConf), policy: policy, gracefulRestartTimer: time.NewTimer(time.Hour), - version: fsmVersion, + notification: make(chan *bgp.BGPMessage, 1), } fsm.gracefulRestartTimer.Stop() return fsm @@ -382,7 +383,6 @@ type fsmHandler struct { msgCh *channels.InfiniteChannel stateReasonCh chan fsmStateReason incoming *channels.InfiniteChannel - stateCh chan *fsmMsg outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification *bgp.BGPMessage @@ -391,13 +391,12 @@ type fsmHandler struct { wg *sync.WaitGroup } -func newFSMHandler(fsm *fsm, incoming *channels.InfiniteChannel, stateCh chan *fsmMsg, outgoing *channels.InfiniteChannel) *fsmHandler { +func newFSMHandler(fsm *fsm, outgoing *channels.InfiniteChannel) *fsmHandler { ctx, cancel := context.WithCancel(context.Background()) h := &fsmHandler{ fsm: fsm, stateReasonCh: make(chan fsmStateReason, 2), - incoming: incoming, - stateCh: stateCh, + incoming: fsm.incomingCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), wg: &sync.WaitGroup{}, @@ -944,10 +943,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { "error": err, }).Warn("Session will be reset due to malformed BGP Header") fmsg := &fsmMsg{ + fsm: h.fsm, MsgType: fsmMsgBGPMessage, MsgSrc: h.fsm.pConf.State.NeighborAddress, MsgData: err, - Version: h.fsm.version, } h.fsm.lock.RUnlock() return fmsg, err @@ -977,10 +976,10 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { } h.fsm.lock.RLock() fmsg := &fsmMsg{ + fsm: h.fsm, MsgType: fsmMsgBGPMessage, MsgSrc: h.fsm.pConf.State.NeighborAddress, timestamp: now, - Version: h.fsm.version, } h.fsm.lock.RUnlock() @@ -1757,6 +1756,13 @@ func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateRe for { select { case <-ctx.Done(): + select { + case m := <-fsm.notification: + b, _ := m.Serialize(h.fsm.marshallingOptions) + h.conn.Write(b) + default: + // nothing to do + } h.conn.Close() return -1, newfsmStateReason(fsmDying, nil, nil) case conn, ok := <-fsm.connCh: @@ -1881,18 +1887,15 @@ func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error { } fsm.lock.RUnlock() - // under zero means that the context was canceled. - if nextState >= bgp.BGP_FSM_IDLE { - fsm.lock.RLock() - h.stateCh <- &fsmMsg{ - MsgType: fsmMsgStateChange, - MsgSrc: fsm.pConf.State.NeighborAddress, - MsgData: nextState, - StateReason: reason, - Version: h.fsm.version, - } - fsm.lock.RUnlock() + fsm.lock.RLock() + h.incoming.In() <- &fsmMsg{ + fsm: fsm, + MsgType: fsmMsgStateChange, + MsgSrc: fsm.pConf.State.NeighborAddress, + MsgData: nextState, + StateReason: reason, } + fsm.lock.RUnlock() return nil } diff --git a/pkg/server/fsm_test.go b/pkg/server/fsm_test.go index cd3eb7e2..ac5ffd7b 100644 --- a/pkg/server/fsm_test.go +++ b/pkg/server/fsm_test.go @@ -311,15 +311,14 @@ func TestCheckOwnASLoop(t *testing.T) { func makePeerAndHandler() (*peer, *fsmHandler) { p := &peer{ - fsm: newFSM(&config.Global{}, &config.Neighbor{}, table.NewRoutingPolicy()), - outgoing: channels.NewInfiniteChannel(), + fsm: newFSM(&config.Global{}, &config.Neighbor{}, table.NewRoutingPolicy()), } h := &fsmHandler{ fsm: p.fsm, stateReasonCh: make(chan fsmStateReason, 2), incoming: channels.NewInfiniteChannel(), - outgoing: p.outgoing, + outgoing: channels.NewInfiniteChannel(), } return p, h diff --git a/pkg/server/peer.go b/pkg/server/peer.go index b4a7b23c..d7fb46c3 100644 --- a/pkg/server/peer.go +++ b/pkg/server/peer.go @@ -24,7 +24,6 @@ import ( "github.com/osrg/gobgp/internal/pkg/table" "github.com/osrg/gobgp/pkg/packet/bgp" - "github.com/eapache/channels" log "github.com/sirupsen/logrus" ) @@ -97,7 +96,6 @@ type peer struct { tableId string fsm *fsm adjRibIn *table.AdjRib - outgoing *channels.InfiniteChannel policy *table.RoutingPolicy localRib *table.TableManager prefixLimitWarned map[bgp.RouteFamily]bool @@ -106,7 +104,6 @@ type peer struct { func newPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, policy *table.RoutingPolicy) *peer { peer := &peer{ - outgoing: channels.NewInfiniteChannel(), localRib: loc, policy: policy, fsm: newFSM(g, conf, policy), @@ -528,8 +525,8 @@ func (peer *peer) handleUpdate(e *fsmMsg) ([]*table.Path, []bgp.RouteFamily, *bg return nil, nil, nil } -func (peer *peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *fsmMsg) { - handler := newFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) +func (peer *peer) startFSMHandler() { + handler := newFSMHandler(peer.fsm, peer.fsm.outgoingCh) peer.fsm.lock.Lock() peer.fsm.h = handler peer.fsm.lock.Unlock() @@ -554,29 +551,3 @@ func (peer *peer) PassConn(conn *net.TCPConn) { func (peer *peer) DropAll(rfList []bgp.RouteFamily) { peer.adjRibIn.Drop(rfList) } - -func (peer *peer) stopFSM() error { - failed := false - peer.fsm.lock.RLock() - addr := peer.fsm.pConf.State.NeighborAddress - state := peer.fsm.state - peer.fsm.lock.RUnlock() - t1 := time.AfterFunc(time.Minute*5, func() { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Warnf("Failed to free the fsm.h.t for %s", addr) - failed = true - }) - peer.fsm.h.ctxCancel() - peer.fsm.h.wg.Wait() - t1.Stop() - if !failed { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": addr, - "State": state, - }).Debug("freed fsm.h.t") - cleanInfiniteChannel(peer.outgoing) - } - return fmt.Errorf("failed to free FSM for %s", addr) -} diff --git a/pkg/server/server.go b/pkg/server/server.go index 7e7c370f..cdc557b5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "reflect" "strconv" "sync" "time" @@ -117,11 +118,9 @@ func GrpcOption(opt []grpc.ServerOption) ServerOption { } type BgpServer struct { - bgpConfig config.Bgp - fsmincomingCh *channels.InfiniteChannel - fsmStateCh chan *fsmMsg - acceptCh chan *net.TCPConn - + bgpConfig config.Bgp + acceptCh chan *net.TCPConn + incomings []*channels.InfiniteChannel mgmtCh chan *mgmtOp policy *table.RoutingPolicy listeners []*tcpListener @@ -169,6 +168,19 @@ func NewBgpServer(opt ...ServerOption) *BgpServer { return s } +func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) { + s.incomings = append(s.incomings, ch) +} + +func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) { + for i, c := range s.incomings { + if c == ch { + s.incomings = append(s.incomings[:i], s.incomings[i+1:]...) + return + } + } +} + func (s *BgpServer) listListeners(addr string) []*net.TCPListener { list := make([]*net.TCPListener, 0, len(s.listeners)) rhs := net.ParseIP(addr).To4() != nil @@ -216,152 +228,173 @@ func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error) return } -func (s *BgpServer) Serve() { - s.listeners = make([]*tcpListener, 0, 2) - s.fsmincomingCh = channels.NewInfiniteChannel() - s.fsmStateCh = make(chan *fsmMsg, 4096) - - handlefsmMsg := func(e *fsmMsg) { - peer, found := s.neighborMap[e.MsgSrc] - if !found { +func (s *BgpServer) passConnToPeer(conn *net.TCPConn) { + host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + ipaddr, _ := net.ResolveIPAddr("ip", host) + remoteAddr := ipaddr.String() + peer, found := s.neighborMap[remoteAddr] + if found { + peer.fsm.lock.RLock() + adminStateNotUp := peer.fsm.adminState != adminStateUp + peer.fsm.lock.RUnlock() + if adminStateNotUp { + peer.fsm.lock.RLock() log.WithFields(log.Fields{ - "Topic": "Peer", - }).Warnf("Can't find the neighbor %s", e.MsgSrc) + "Topic": "Peer", + "Remote Addr": remoteAddr, + "Admin State": peer.fsm.adminState, + }).Debug("New connection for non admin-state-up peer") + peer.fsm.lock.RUnlock() + conn.Close() return } peer.fsm.lock.RLock() - versionMismatch := e.Version != peer.fsm.version + localAddr := peer.fsm.pConf.Transport.Config.LocalAddress peer.fsm.lock.RUnlock() - if versionMismatch { + localAddrValid := func(laddr string) bool { + if laddr == "0.0.0.0" || laddr == "::" { + return true + } + l := conn.LocalAddr() + if l == nil { + // already closed + return false + } + + host, _, _ := net.SplitHostPort(l.String()) + if host != laddr { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": remoteAddr, + "Configured addr": laddr, + "Addr": host, + }).Info("Mismatched local address") + return false + } + return true + }(localAddr) + + if !localAddrValid { + conn.Close() + return + } + + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Debugf("Accepted a new passive connection from:%s", remoteAddr) + peer.PassConn(conn) + } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr) + rib := s.globalRib + if pg.Conf.RouteServer.Config.RouteServerClient { + rib = s.rsRib + } + peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy) + if peer == nil { log.WithFields(log.Fields{ "Topic": "Peer", - }).Debug("FSM version inconsistent") + "Key": remoteAddr, + }).Infof("Can't create new Dynamic Peer") + conn.Close() return } - s.handleFSMMessage(peer, e) + s.addIncoming(peer.fsm.incomingCh) + peer.fsm.lock.RLock() + policy := peer.fsm.pConf.ApplyPolicy + peer.fsm.lock.RUnlock() + s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) + s.neighborMap[remoteAddr] = peer + peer.startFSMHandler() + s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) + peer.PassConn(conn) + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr) + conn.Close() } +} - for { - passConn := func(conn *net.TCPConn) { - host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - ipaddr, _ := net.ResolveIPAddr("ip", host) - remoteAddr := ipaddr.String() - peer, found := s.neighborMap[remoteAddr] - if found { - peer.fsm.lock.RLock() - adminStateNotUp := peer.fsm.adminState != adminStateUp - peer.fsm.lock.RUnlock() - if adminStateNotUp { - peer.fsm.lock.RLock() - log.WithFields(log.Fields{ - "Topic": "Peer", - "Remote Addr": remoteAddr, - "Admin State": peer.fsm.adminState, - }).Debug("New connection for non admin-state-up peer") - peer.fsm.lock.RUnlock() - conn.Close() - return - } - peer.fsm.lock.RLock() - localAddr := peer.fsm.pConf.Transport.Config.LocalAddress - peer.fsm.lock.RUnlock() - localAddrValid := func(laddr string) bool { - if laddr == "0.0.0.0" || laddr == "::" { - return true - } - l := conn.LocalAddr() - if l == nil { - // already closed - return false - } +const firstPeerCaseIndex = 3 - host, _, _ := net.SplitHostPort(l.String()) - if host != laddr { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": remoteAddr, - "Configured addr": laddr, - "Addr": host, - }).Info("Mismatched local address") - return false - } - return true - }(localAddr) +func (s *BgpServer) Serve() { + s.listeners = make([]*tcpListener, 0, 2) - if !localAddrValid { - conn.Close() - return - } + handlefsmMsg := func(e *fsmMsg) { + fsm := e.fsm + if fsm.h.ctx.Err() != nil { + // canceled + addr := fsm.pConf.State.NeighborAddress + state := fsm.state - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Debugf("Accepted a new passive connection from:%s", remoteAddr) - peer.PassConn(conn) - } else if pg := s.matchLongestDynamicNeighborPrefix(remoteAddr); pg != nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Debugf("Accepted a new dynamic neighbor from:%s", remoteAddr) - rib := s.globalRib - if pg.Conf.RouteServer.Config.RouteServerClient { - rib = s.rsRib - } - peer := newDynamicPeer(&s.bgpConfig.Global, remoteAddr, pg.Conf, rib, s.policy) - if peer == nil { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": remoteAddr, - }).Infof("Can't create new Dynamic Peer") - conn.Close() - return - } - peer.fsm.lock.RLock() - policy := peer.fsm.pConf.ApplyPolicy - peer.fsm.lock.RUnlock() - s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): policy}) - s.neighborMap[remoteAddr] = peer - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) - s.broadcastPeerState(peer, bgp.BGP_FSM_ACTIVE, nil) - peer.PassConn(conn) - } else { - log.WithFields(log.Fields{ - "Topic": "Peer", - }).Infof("Can't find configuration for a new passive connection from:%s", remoteAddr) - conn.Close() + fsm.h.wg.Wait() + + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": addr, + "State": state, + }).Debug("freed fsm.h") + + cleanInfiniteChannel(fsm.outgoingCh) + cleanInfiniteChannel(fsm.incomingCh) + s.delIncoming(fsm.incomingCh) + if s.shutdownWG != nil && len(s.incomings) == 0 { + s.shutdownWG.Done() } + return } - select { - case op := <-s.mgmtCh: - s.handleMGMTOp(op) - case conn := <-s.acceptCh: - passConn(conn) - default: + peer, found := s.neighborMap[e.MsgSrc] + if !found { + log.WithFields(log.Fields{ + "Topic": "Peer", + }).Warnf("Can't find the neighbor %s", e.MsgSrc) + return } + s.handleFSMMessage(peer, e) + } - for { - select { - case e := <-s.fsmStateCh: - handlefsmMsg(e) - default: - goto CONT + for { + cases := make([]reflect.SelectCase, firstPeerCaseIndex+len(s.incomings)) + cases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.mgmtCh), + } + cases[1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.acceptCh), + } + cases[2] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.roaManager.ReceiveROA()), + } + for i := firstPeerCaseIndex; i < len(cases); i++ { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()), } } - CONT: - select { - case op := <-s.mgmtCh: + chosen, value, ok := reflect.Select(cases) + switch chosen { + case 0: + op := value.Interface().(*mgmtOp) s.handleMGMTOp(op) - case rmsg := <-s.roaManager.ReceiveROA(): - s.roaManager.HandleROAEvent(rmsg) - case conn := <-s.acceptCh: - passConn(conn) - case e, ok := <-s.fsmincomingCh.Out(): - if !ok { - continue + case 1: + conn := value.Interface().(*net.TCPConn) + s.passConnToPeer(conn) + case 2: + ev := value.Interface().(*roaEvent) + s.roaManager.HandleROAEvent(ev) + default: + // in the case of dynamic peer, handleFSMMessage closed incoming channel so + // nil fsmMsg can happen here. + if ok { + e := value.Interface().(*fsmMsg) + handlefsmMsg(e) } - handlefsmMsg(e.(*fsmMsg)) - case e := <-s.fsmStateCh: - handlefsmMsg(e) } } } @@ -385,7 +418,7 @@ func (s *BgpServer) matchLongestDynamicNeighborPrefix(a string) *peerGroup { } func sendfsmOutgoingMsg(peer *peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) { - peer.outgoing.In() <- &fsmOutgoingMsg{ + peer.fsm.outgoingCh.In() <- &fsmOutgoingMsg{ Paths: paths, Notification: notification, StayIdle: stayIdle, @@ -1155,10 +1188,11 @@ func (s *BgpServer) propagateUpdateToNeighbors(source *peer, newPath *table.Path func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e *fsmMsg) { peer.stopPeerRestarting() - go peer.stopFSM() peer.fsm.lock.RLock() delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress) peer.fsm.lock.RUnlock() + cleanInfiniteChannel(peer.fsm.outgoingCh) + cleanInfiniteChannel(peer.fsm.incomingCh) s.broadcastPeerState(peer, oldState, e) } @@ -1293,8 +1327,8 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { } } - cleanInfiniteChannel(peer.outgoing) - peer.outgoing = channels.NewInfiniteChannel() + cleanInfiniteChannel(peer.fsm.outgoingCh) + peer.fsm.outgoingCh = channels.NewInfiniteChannel() if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy laddr, _ := peer.fsm.LocalHostPort() @@ -1393,21 +1427,6 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { } } } else { - if s.shutdownWG != nil && nextState == bgp.BGP_FSM_IDLE { - die := true - for _, p := range s.neighborMap { - p.fsm.lock.RLock() - stateNotIdle := p.fsm.state != bgp.BGP_FSM_IDLE - p.fsm.lock.RUnlock() - if stateNotIdle { - die = false - break - } - } - if die { - s.shutdownWG.Done() - } - } peer.fsm.lock.Lock() peer.fsm.pConf.Timers.State.Downtime = time.Now().Unix() peer.fsm.lock.Unlock() @@ -1424,7 +1443,7 @@ func (s *BgpServer) handleFSMMessage(peer *peer, e *fsmMsg) { peer.fsm.pConf.Timers.State = config.TimersState{} peer.fsm.lock.Unlock() } - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) + peer.startFSMHandler() s.broadcastPeerState(peer, oldState, e) case fsmMsgRouteRefresh: peer.fsm.lock.RLock() @@ -1624,12 +1643,18 @@ func (s *BgpServer) DeleteBmp(ctx context.Context, r *api.DeleteBmpRequest) erro func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error { s.mgmtOperation(func() error { - s.shutdownWG = new(sync.WaitGroup) - s.shutdownWG.Add(1) - + names := make([]string, 0, len(s.neighborMap)) for k := range s.neighborMap { + names = append(names, k) + } + + if len(names) != 0 { + s.shutdownWG = new(sync.WaitGroup) + s.shutdownWG.Add(1) + } + for _, name := range names { if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{ - NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil { + NeighborAddress: name}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil { return err } } @@ -1640,13 +1665,9 @@ func (s *BgpServer) StopBgp(ctx context.Context, r *api.StopBgpRequest) error { return nil }, false) - // Waits for all goroutines per peer to stop. - // Note: This should not be wrapped with s.mgmtOperation() in order to - // avoid the deadlock in the main goroutine of BgpServer. - // if s.shutdownWG != nil { - // s.shutdownWG.Wait() - // s.shutdownWG = nil - // } + if s.shutdownWG != nil { + s.shutdownWG.Wait() + } return nil } @@ -2636,12 +2657,13 @@ func (s *BgpServer) addNeighbor(c *config.Neighbor) error { rib = s.rsRib } peer := newPeer(&s.bgpConfig.Global, c, rib, s.policy) + s.addIncoming(peer.fsm.incomingCh) s.policy.Reset(nil, map[string]config.ApplyPolicy{peer.ID(): c.ApplyPolicy}) s.neighborMap[addr] = peer if name := c.Config.PeerGroup; name != "" { s.peerGroupMap[name].AddMember(*c) } - peer.startFSMHandler(s.fsmincomingCh, s.fsmStateCh) + peer.startFSMHandler() s.broadcastPeerState(peer, bgp.BGP_FSM_IDLE, nil) return nil } @@ -2727,10 +2749,10 @@ func (s *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) erro "Topic": "Peer", }).Infof("Delete a peer configuration for:%s", addr) - n.fsm.sendNotification(code, subcode, nil, "") n.stopPeerRestarting() + n.fsm.notification <- bgp.NewBGPNotificationMessage(code, subcode, nil) + n.fsm.h.ctxCancel() - go n.stopFSM() delete(s.neighborMap, addr) s.dropPeerAllRoutes(n, n.configuredRFlist()) return nil diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 54a2dd00..f325f3ad 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -36,6 +36,44 @@ import ( "github.com/osrg/gobgp/pkg/packet/bgp" ) +func TestStop(t *testing.T) { + assert := assert.New(t) + s := NewBgpServer() + go s.Serve() + err := s.StartBgp(context.Background(), &api.StartBgpRequest{ + Global: &api.Global{ + As: 1, + RouterId: "1.1.1.1", + ListenPort: -1, + }, + }) + assert.Nil(err) + s.StopBgp(context.Background(), &api.StopBgpRequest{}) + + s = NewBgpServer() + go s.Serve() + err = s.StartBgp(context.Background(), &api.StartBgpRequest{ + Global: &api.Global{ + As: 1, + RouterId: "1.1.1.1", + ListenPort: -1, + }, + }) + assert.Nil(err) + p := &api.Peer{ + Conf: &api.PeerConf{ + NeighborAddress: "2.2.2.2", + PeerAs: 1, + }, + RouteServer: &api.RouteServer{ + RouteServerClient: true, + }, + } + err = s.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p}) + assert.Nil(err) + s.StopBgp(context.Background(), &api.StopBgpRequest{}) +} + func TestModPolicyAssign(t *testing.T) { assert := assert.New(t) s := NewBgpServer() @@ -184,18 +222,16 @@ func TestMonitor(test *testing.T) { assert.Nil(err) defer s.StopBgp(context.Background(), &api.StopBgpRequest{}) - n := &config.Neighbor{ - Config: config.NeighborConfig{ + p1 := &api.Peer{ + Conf: &api.PeerConf{ NeighborAddress: "127.0.0.1", PeerAs: 2, }, - Transport: config.Transport{ - Config: config.TransportConfig{ - PassiveMode: true, - }, + Transport: &api.Transport{ + PassiveMode: true, }, } - err = s.addNeighbor(n) + err = s.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1}) assert.Nil(err) t := NewBgpServer() @@ -210,15 +246,13 @@ func TestMonitor(test *testing.T) { assert.Nil(err) defer t.StopBgp(context.Background(), &api.StopBgpRequest{}) - m := &config.Neighbor{ - Config: config.NeighborConfig{ + p2 := &api.Peer{ + Conf: &api.PeerConf{ NeighborAddress: "127.0.0.1", PeerAs: 1, }, - Transport: config.Transport{ - Config: config.TransportConfig{ - RemotePort: 10179, - }, + Transport: &api.Transport{ + RemotePort: 10179, }, } ch := make(chan struct{}) @@ -228,7 +262,7 @@ func TestMonitor(test *testing.T) { } }) - err = t.AddPeer(context.Background(), &api.AddPeerRequest{Peer: config.NewPeerFromConfigStruct(m)}) + err = t.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p2}) assert.Nil(err) <-ch @@ -689,24 +723,20 @@ func TestGracefulRestartTimerExpired(t *testing.T) { assert.Nil(err) defer s1.StopBgp(context.Background(), &api.StopBgpRequest{}) - n := &config.Neighbor{ - Config: config.NeighborConfig{ + p1 := &api.Peer{ + Conf: &api.PeerConf{ NeighborAddress: "127.0.0.1", PeerAs: 2, }, - Transport: config.Transport{ - Config: config.TransportConfig{ - PassiveMode: true, - }, + Transport: &api.Transport{ + PassiveMode: true, }, - GracefulRestart: config.GracefulRestart{ - Config: config.GracefulRestartConfig{ - Enabled: true, - RestartTime: minConnectRetryInterval, - }, + GracefulRestart: &api.GracefulRestart{ + Enabled: true, + RestartTime: minConnectRetryInterval, }, } - err = s1.addNeighbor(n) + err = s1.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1}) assert.Nil(err) s2 := NewBgpServer() @@ -721,30 +751,27 @@ func TestGracefulRestartTimerExpired(t *testing.T) { require.NoError(t, err) defer s2.StopBgp(context.Background(), &api.StopBgpRequest{}) - m := &config.Neighbor{ - Config: config.NeighborConfig{ + p2 := &api.Peer{ + Conf: &api.PeerConf{ NeighborAddress: "127.0.0.1", PeerAs: 1, }, - Transport: config.Transport{ - Config: config.TransportConfig{ - RemotePort: 10179, - }, + Transport: &api.Transport{ + RemotePort: 10179, }, - GracefulRestart: config.GracefulRestart{ - Config: config.GracefulRestartConfig{ - Enabled: true, - RestartTime: 1, - }, + GracefulRestart: &api.GracefulRestart{ + Enabled: true, + RestartTime: 1, }, } + ch := make(chan struct{}) go s2.MonitorPeer(context.Background(), &api.MonitorPeerRequest{}, func(peer *api.Peer) { if peer.State.SessionState == api.PeerState_ESTABLISHED { close(ch) } }) - err = s2.addNeighbor(m) + err = s2.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p2}) assert.Nil(err) <-ch |