summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/server.go62
1 files changed, 48 insertions, 14 deletions
diff --git a/server/server.go b/server/server.go
index 859abba2..0bcc578f 100644
--- a/server/server.go
+++ b/server/server.go
@@ -49,6 +49,11 @@ type SenderMsg struct {
twoBytesAs bool
}
+type broadcastMsg struct {
+ req *GrpcRequest
+ result *GrpcResponse
+}
+
type BgpServer struct {
bgpConfig config.Bgp
globalTypeCh chan config.Global
@@ -60,9 +65,9 @@ type BgpServer struct {
policyMap map[string]*policy.Policy
routingPolicy config.RoutingPolicy
broadcastReqs []*GrpcRequest
-
- neighborMap map[string]*Peer
- localRibMap map[string]*LocalRib
+ broadcastMsgs []*broadcastMsg
+ neighborMap map[string]*Peer
+ localRibMap map[string]*LocalRib
}
func NewBgpServer(port int) *BgpServer {
@@ -135,6 +140,19 @@ func (server *BgpServer) Serve() {
}
}(senderCh)
+ broadcastCh := make(chan *broadcastMsg, 8)
+ go func(ch chan *broadcastMsg) {
+ for {
+ m := <-ch
+ select {
+ case <-m.req.EndCh:
+ continue
+ default:
+ }
+ m.req.ResponseCh <- m.result
+ }
+ }(broadcastCh)
+
// FIXME
rfList := func(l []config.AfiSafi) []bgp.RouteFamily {
rfList := []bgp.RouteFamily{}
@@ -177,6 +195,13 @@ func (server *BgpServer) Serve() {
sCh = senderCh
firstMsg = senderMsgs[0]
}
+ var firstBroadcastMsg *broadcastMsg
+ var bCh chan *broadcastMsg
+ if len(server.broadcastMsgs) > 0 {
+ bCh = broadcastCh
+ firstBroadcastMsg = server.broadcastMsgs[0]
+ }
+
select {
case conn := <-acceptCh:
remoteAddr, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
@@ -266,7 +291,8 @@ func (server *BgpServer) Serve() {
}
case sCh <- firstMsg:
senderMsgs = senderMsgs[1:]
-
+ case bCh <- firstBroadcastMsg:
+ server.broadcastMsgs = server.broadcastMsgs[1:]
case grpcReq := <-server.GrpcReqCh:
m := server.handleGrpc(grpcReq)
if len(m) > 0 {
@@ -448,16 +474,20 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) {
}
remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
for _, req := range server.broadcastReqs {
- if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
- remainReqs = append(remainReqs, req)
- continue
- }
select {
case <-req.EndCh:
continue
- case req.ResponseCh <- result:
default:
}
+ if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
+ remainReqs = append(remainReqs, req)
+ continue
+ }
+ m := &broadcastMsg{
+ req: req,
+ result: result,
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs
@@ -470,18 +500,22 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) {
}
remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
for _, req := range server.broadcastReqs {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE
ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String())
if ignore {
remainReqs = append(remainReqs, req)
continue
}
- select {
- case <-req.EndCh:
- continue
- case req.ResponseCh <- result:
- default:
+ m := &broadcastMsg{
+ req: req,
+ result: result,
}
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs