summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/fsm.go6
-rw-r--r--server/peer.go4
-rw-r--r--server/server.go45
-rw-r--r--test/performance_test/main.go4
4 files changed, 40 insertions, 19 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 1a5943d2..3aaf702f 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -305,16 +305,18 @@ type FSMHandler struct {
msgCh chan *FsmMsg
errorCh chan bool
incoming chan *FsmMsg
+ stateCh chan *FsmMsg
outgoing chan *bgp.BGPMessage
holdTimerResetCh chan bool
reason string
}
-func NewFSMHandler(fsm *FSM, incoming chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan bool, 2),
incoming: incoming,
+ stateCh: stateCh,
outgoing: outgoing,
holdTimerResetCh: make(chan bool, 2),
}
@@ -1020,7 +1022,7 @@ func (h *FSMHandler) loop() error {
MsgDst: fsm.pConf.Transport.Config.LocalAddress,
MsgData: nextState,
}
- h.incoming <- e
+ h.stateCh <- e
}
return nil
}
diff --git a/server/peer.go b/server/peer.go
index 9b7e4a81..22f98e6c 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -182,8 +182,8 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage)
return nil, nil
}
-func (peer *Peer) startFSMHandler(incoming chan *FsmMsg) {
- peer.fsm.h = NewFSMHandler(peer.fsm, incoming, peer.outgoing)
+func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) {
+ peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
}
func (peer *Peer) PassConn(conn *net.TCPConn) {
diff --git a/server/server.go b/server/server.go
index afb4550a..00f047ad 100644
--- a/server/server.go
+++ b/server/server.go
@@ -81,6 +81,7 @@ type BgpServer struct {
deletedPeerCh chan config.Neighbor
updatedPeerCh chan config.Neighbor
fsmincomingCh chan *FsmMsg
+ fsmStateCh chan *FsmMsg
rpkiConfigCh chan config.RpkiServers
bmpConfigCh chan config.BmpServers
@@ -250,12 +251,26 @@ func (server *BgpServer) Serve() {
}
server.fsmincomingCh = make(chan *FsmMsg, 4096)
+ server.fsmStateCh = make(chan *FsmMsg, 4096)
var senderMsgs []*SenderMsg
var zapiMsgCh chan *zebra.Message
if server.zclient != nil {
zapiMsgCh = server.zclient.Receive()
}
+
+ handleFsmMsg := func(e *FsmMsg) {
+ peer, found := server.neighborMap[e.MsgSrc]
+ if !found {
+ log.Warn("Can't find the neighbor ", e.MsgSrc)
+ return
+ }
+ m := server.handleFSMMessage(peer, e)
+ if len(m) > 0 {
+ senderMsgs = append(senderMsgs, m...)
+ }
+ }
+
for {
var firstMsg *SenderMsg
var sCh chan *SenderMsg
@@ -319,6 +334,16 @@ func (server *BgpServer) Serve() {
default:
}
+ for {
+ select {
+ case e := <-server.fsmStateCh:
+ handleFsmMsg(e)
+ default:
+ goto CONT
+ }
+ }
+ CONT:
+
select {
case c := <-server.rpkiConfigCh:
server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c)
@@ -386,7 +411,7 @@ func (server *BgpServer) Serve() {
}
}
server.neighborMap[addr] = peer
- peer.startFSMHandler(server.fsmincomingCh)
+ peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer)
case config := <-server.deletedPeerCh:
addr := config.Config.NeighborAddress
@@ -419,15 +444,9 @@ func (server *BgpServer) Serve() {
peer.conf = config
server.setPolicyByConfig(peer.ID(), config.ApplyPolicy)
case e := <-server.fsmincomingCh:
- peer, found := server.neighborMap[e.MsgSrc]
- if !found {
- log.Warn("Can't find the neighbor ", e.MsgSrc)
- break
- }
- m := server.handleFSMMessage(peer, e, server.fsmincomingCh)
- if len(m) > 0 {
- senderMsgs = append(senderMsgs, m...)
- }
+ handleFsmMsg(e)
+ case e := <-server.fsmStateCh:
+ handleFsmMsg(e)
case sCh <- firstMsg:
senderMsgs = senderMsgs[1:]
case bCh <- firstBroadcastMsg:
@@ -764,7 +783,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
return msgs, alteredPathList
}
-func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *FsmMsg) []*SenderMsg {
+func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
msgs := make([]*SenderMsg, 0)
switch e.MsgType {
@@ -831,7 +850,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *
peer.conf.State = config.NeighborState{}
peer.conf.Timers.State = config.TimersState{}
}
- peer.startFSMHandler(incoming)
+ peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer)
case FSM_MSG_BGP_MESSAGE:
@@ -2059,7 +2078,7 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S
}
}
server.neighborMap[addr] = peer
- peer.startFSMHandler(server.fsmincomingCh)
+ peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer)
case api.Operation_DEL:
SetTcpMD5SigSockopts(listener(net.ParseIP(addr)), addr, "")
diff --git a/test/performance_test/main.go b/test/performance_test/main.go
index 4385dd68..aec15b54 100644
--- a/test/performance_test/main.go
+++ b/test/performance_test/main.go
@@ -31,7 +31,7 @@ import (
func newPeer(g config.Global, p config.Neighbor, incoming chan *server.FsmMsg) *server.Peer {
tbl := table.NewTableManager([]bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}, 0, 0)
peer := server.NewPeer(g, p, tbl, table.NewRoutingPolicy())
- server.NewFSMHandler(peer.Fsm(), incoming, peer.Outgoing())
+ server.NewFSMHandler(peer.Fsm(), incoming, incoming, peer.Outgoing())
return peer
}
@@ -85,7 +85,7 @@ func main() {
nextState := msg.MsgData.(bgp.FSMState)
fsm := peer.Fsm()
fsm.StateChange(nextState)
- server.NewFSMHandler(fsm, incoming, peer.Outgoing())
+ server.NewFSMHandler(fsm, incoming, incoming, peer.Outgoing())
if nextState == bgp.BGP_FSM_ESTABLISHED {
established++
}