diff options
-rw-r--r-- | server/fsm.go | 6 | ||||
-rw-r--r-- | server/peer.go | 4 | ||||
-rw-r--r-- | server/server.go | 45 | ||||
-rw-r--r-- | test/performance_test/main.go | 4 |
4 files changed, 40 insertions, 19 deletions
diff --git a/server/fsm.go b/server/fsm.go index 1a5943d2..3aaf702f 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -305,16 +305,18 @@ type FSMHandler struct { msgCh chan *FsmMsg errorCh chan bool incoming chan *FsmMsg + stateCh chan *FsmMsg outgoing chan *bgp.BGPMessage holdTimerResetCh chan bool reason string } -func NewFSMHandler(fsm *FSM, incoming chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan bool, 2), incoming: incoming, + stateCh: stateCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), } @@ -1020,7 +1022,7 @@ func (h *FSMHandler) loop() error { MsgDst: fsm.pConf.Transport.Config.LocalAddress, MsgData: nextState, } - h.incoming <- e + h.stateCh <- e } return nil } diff --git a/server/peer.go b/server/peer.go index 9b7e4a81..22f98e6c 100644 --- a/server/peer.go +++ b/server/peer.go @@ -182,8 +182,8 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage) return nil, nil } -func (peer *Peer) startFSMHandler(incoming chan *FsmMsg) { - peer.fsm.h = NewFSMHandler(peer.fsm, incoming, peer.outgoing) +func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) { + peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) } func (peer *Peer) PassConn(conn *net.TCPConn) { diff --git a/server/server.go b/server/server.go index afb4550a..00f047ad 100644 --- a/server/server.go +++ b/server/server.go @@ -81,6 +81,7 @@ type BgpServer struct { deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor fsmincomingCh chan *FsmMsg + fsmStateCh chan *FsmMsg rpkiConfigCh chan config.RpkiServers bmpConfigCh chan config.BmpServers @@ -250,12 +251,26 @@ func (server *BgpServer) Serve() { } server.fsmincomingCh = make(chan *FsmMsg, 4096) + server.fsmStateCh = make(chan *FsmMsg, 4096) var senderMsgs []*SenderMsg var zapiMsgCh chan *zebra.Message if server.zclient != nil { zapiMsgCh = server.zclient.Receive() } + + handleFsmMsg := func(e *FsmMsg) { + peer, found := server.neighborMap[e.MsgSrc] + if !found { + log.Warn("Can't find the neighbor ", e.MsgSrc) + return + } + m := server.handleFSMMessage(peer, e) + if len(m) > 0 { + senderMsgs = append(senderMsgs, m...) + } + } + for { var firstMsg *SenderMsg var sCh chan *SenderMsg @@ -319,6 +334,16 @@ func (server *BgpServer) Serve() { default: } + for { + select { + case e := <-server.fsmStateCh: + handleFsmMsg(e) + default: + goto CONT + } + } + CONT: + select { case c := <-server.rpkiConfigCh: server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c) @@ -386,7 +411,7 @@ func (server *BgpServer) Serve() { } } server.neighborMap[addr] = peer - peer.startFSMHandler(server.fsmincomingCh) + peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer) case config := <-server.deletedPeerCh: addr := config.Config.NeighborAddress @@ -419,15 +444,9 @@ func (server *BgpServer) Serve() { peer.conf = config server.setPolicyByConfig(peer.ID(), config.ApplyPolicy) case e := <-server.fsmincomingCh: - peer, found := server.neighborMap[e.MsgSrc] - if !found { - log.Warn("Can't find the neighbor ", e.MsgSrc) - break - } - m := server.handleFSMMessage(peer, e, server.fsmincomingCh) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } + handleFsmMsg(e) + case e := <-server.fsmStateCh: + handleFsmMsg(e) case sCh <- firstMsg: senderMsgs = senderMsgs[1:] case bCh <- firstBroadcastMsg: @@ -764,7 +783,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] return msgs, alteredPathList } -func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *FsmMsg) []*SenderMsg { +func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { msgs := make([]*SenderMsg, 0) switch e.MsgType { @@ -831,7 +850,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan * peer.conf.State = config.NeighborState{} peer.conf.Timers.State = config.TimersState{} } - peer.startFSMHandler(incoming) + peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer) case FSM_MSG_BGP_MESSAGE: @@ -2059,7 +2078,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S } } server.neighborMap[addr] = peer - peer.startFSMHandler(server.fsmincomingCh) + peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer) case api.Operation_DEL: SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "") diff --git a/test/performance_test/main.go b/test/performance_test/main.go index 4385dd68..aec15b54 100644 --- a/test/performance_test/main.go +++ b/test/performance_test/main.go @@ -31,7 +31,7 @@ import ( func newPeer(g config.Global, p config.Neighbor, incoming chan *server.FsmMsg) *server.Peer { tbl := table.NewTableManager([]bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}, 0, 0) peer := server.NewPeer(g, p, tbl, table.NewRoutingPolicy()) - server.NewFSMHandler(peer.Fsm(), incoming, peer.Outgoing()) + server.NewFSMHandler(peer.Fsm(), incoming, incoming, peer.Outgoing()) return peer } @@ -85,7 +85,7 @@ func main() { nextState := msg.MsgData.(bgp.FSMState) fsm := peer.Fsm() fsm.StateChange(nextState) - server.NewFSMHandler(fsm, incoming, peer.Outgoing()) + server.NewFSMHandler(fsm, incoming, incoming, peer.Outgoing()) if nextState == bgp.BGP_FSM_ESTABLISHED { established++ } |