diff options
-rw-r--r-- | server/server.go | 62 |
1 files changed, 48 insertions, 14 deletions
diff --git a/server/server.go b/server/server.go index 859abba2..0bcc578f 100644 --- a/server/server.go +++ b/server/server.go @@ -49,6 +49,11 @@ type SenderMsg struct { twoBytesAs bool } +type broadcastMsg struct { + req *GrpcRequest + result *GrpcResponse +} + type BgpServer struct { bgpConfig config.Bgp globalTypeCh chan config.Global @@ -60,9 +65,9 @@ type BgpServer struct { policyMap map[string]*policy.Policy routingPolicy config.RoutingPolicy broadcastReqs []*GrpcRequest - - neighborMap map[string]*Peer - localRibMap map[string]*LocalRib + broadcastMsgs []*broadcastMsg + neighborMap map[string]*Peer + localRibMap map[string]*LocalRib } func NewBgpServer(port int) *BgpServer { @@ -135,6 +140,19 @@ func (server *BgpServer) Serve() { } }(senderCh) + broadcastCh := make(chan *broadcastMsg, 8) + go func(ch chan *broadcastMsg) { + for { + m := <-ch + select { + case <-m.req.EndCh: + continue + default: + } + m.req.ResponseCh <- m.result + } + }(broadcastCh) + // FIXME rfList := func(l []config.AfiSafi) []bgp.RouteFamily { rfList := []bgp.RouteFamily{} @@ -177,6 +195,13 @@ func (server *BgpServer) Serve() { sCh = senderCh firstMsg = senderMsgs[0] } + var firstBroadcastMsg *broadcastMsg + var bCh chan *broadcastMsg + if len(server.broadcastMsgs) > 0 { + bCh = broadcastCh + firstBroadcastMsg = server.broadcastMsgs[0] + } + select { case conn := <-acceptCh: remoteAddr, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) @@ -266,7 +291,8 @@ func (server *BgpServer) Serve() { } case sCh <- firstMsg: senderMsgs = senderMsgs[1:] - + case bCh <- firstBroadcastMsg: + server.broadcastMsgs = server.broadcastMsgs[1:] case grpcReq := <-server.GrpcReqCh: m := server.handleGrpc(grpcReq) if len(m) > 0 { @@ -448,16 +474,20 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) { } remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) for _, req := range server.broadcastReqs { - if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED { - remainReqs = append(remainReqs, req) - continue - } select { case <-req.EndCh: continue - case req.ResponseCh <- result: default: } + if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED { + remainReqs = append(remainReqs, req) + continue + } + m := &broadcastMsg{ + req: req, + result: result, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) remainReqs = append(remainReqs, req) } server.broadcastReqs = remainReqs @@ -470,18 +500,22 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) { } remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) for _, req := range server.broadcastReqs { + select { + case <-req.EndCh: + continue + default: + } ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String()) if ignore { remainReqs = append(remainReqs, req) continue } - select { - case <-req.EndCh: - continue - case req.ResponseCh <- result: - default: + m := &broadcastMsg{ + req: req, + result: result, } + server.broadcastMsgs = append(server.broadcastMsgs, m) remainReqs = append(remainReqs, req) } server.broadcastReqs = remainReqs |