summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-07-22 22:35:28 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-07-22 22:35:28 +0900
commitcfaa537b8301e5f846839021fd355ab7390b12ff (patch)
tree5a6a5d4022646ee9fd2f769d1f4c2fa3dfe7f2c6 /server/server.go
parenta4501ccc3f680de0a9f9cc8621256d71d3679478 (diff)
server: make monitor rcp request reliable
Currently, if a monitor request sender doesn't read from bgpd some time, a message could be dropped. Instead, this makes bgpd keep messages in memory until they are read. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go62
1 files changed, 48 insertions, 14 deletions
diff --git a/server/server.go b/server/server.go
index 859abba2..0bcc578f 100644
--- a/server/server.go
+++ b/server/server.go
@@ -49,6 +49,11 @@ type SenderMsg struct {
twoBytesAs bool
}
+type broadcastMsg struct {
+ req *GrpcRequest
+ result *GrpcResponse
+}
+
type BgpServer struct {
bgpConfig config.Bgp
globalTypeCh chan config.Global
@@ -60,9 +65,9 @@ type BgpServer struct {
policyMap map[string]*policy.Policy
routingPolicy config.RoutingPolicy
broadcastReqs []*GrpcRequest
-
- neighborMap map[string]*Peer
- localRibMap map[string]*LocalRib
+ broadcastMsgs []*broadcastMsg
+ neighborMap map[string]*Peer
+ localRibMap map[string]*LocalRib
}
func NewBgpServer(port int) *BgpServer {
@@ -135,6 +140,19 @@ func (server *BgpServer) Serve() {
}
}(senderCh)
+ broadcastCh := make(chan *broadcastMsg, 8)
+ go func(ch chan *broadcastMsg) {
+ for {
+ m := <-ch
+ select {
+ case <-m.req.EndCh:
+ continue
+ default:
+ }
+ m.req.ResponseCh <- m.result
+ }
+ }(broadcastCh)
+
// FIXME
rfList := func(l []config.AfiSafi) []bgp.RouteFamily {
rfList := []bgp.RouteFamily{}
@@ -177,6 +195,13 @@ func (server *BgpServer) Serve() {
sCh = senderCh
firstMsg = senderMsgs[0]
}
+ var firstBroadcastMsg *broadcastMsg
+ var bCh chan *broadcastMsg
+ if len(server.broadcastMsgs) > 0 {
+ bCh = broadcastCh
+ firstBroadcastMsg = server.broadcastMsgs[0]
+ }
+
select {
case conn := <-acceptCh:
remoteAddr, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
@@ -266,7 +291,8 @@ func (server *BgpServer) Serve() {
}
case sCh <- firstMsg:
senderMsgs = senderMsgs[1:]
-
+ case bCh <- firstBroadcastMsg:
+ server.broadcastMsgs = server.broadcastMsgs[1:]
case grpcReq := <-server.GrpcReqCh:
m := server.handleGrpc(grpcReq)
if len(m) > 0 {
@@ -448,16 +474,20 @@ func (server *BgpServer) broadcastBests(bests []*table.Path) {
}
remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
for _, req := range server.broadcastReqs {
- if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
- remainReqs = append(remainReqs, req)
- continue
- }
select {
case <-req.EndCh:
continue
- case req.ResponseCh <- result:
default:
}
+ if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
+ remainReqs = append(remainReqs, req)
+ continue
+ }
+ m := &broadcastMsg{
+ req: req,
+ result: result,
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs
@@ -470,18 +500,22 @@ func (server *BgpServer) broadcastPeerState(peer *Peer) {
}
remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
for _, req := range server.broadcastReqs {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE
ignore = ignore || (req.RemoteAddr != "" && req.RemoteAddr != peer.config.NeighborAddress.String())
if ignore {
remainReqs = append(remainReqs, req)
continue
}
- select {
- case <-req.EndCh:
- continue
- case req.ResponseCh <- result:
- default:
+ m := &broadcastMsg{
+ req: req,
+ result: result,
}
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
remainReqs = append(remainReqs, req)
}
server.broadcastReqs = remainReqs