diff options
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 94 |
1 files changed, 44 insertions, 50 deletions
diff --git a/server/server.go b/server/server.go index 7f84d5db..af88f3ca 100644 --- a/server/server.go +++ b/server/server.go @@ -38,9 +38,8 @@ import ( var policyMutex sync.RWMutex type SenderMsg struct { - messages []*bgp.BGPMessage - sendCh chan *bgp.BGPMessage - destination string + ch chan *FsmOutgoingMsg + msg *FsmOutgoingMsg } type broadcastMsg interface { @@ -206,19 +205,17 @@ func (server *BgpServer) Serve() { senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { - for { - // TODO: must be more clever. Slow peer makes other peers slow too. - m := <-ch - w := func(c chan *bgp.BGPMessage, msg *bgp.BGPMessage) { - // nasty but the peer could already become non established state before here. - defer func() { recover() }() - c <- msg - } + w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) { + // nasty but the peer could already become non established state before here. + defer func() { recover() }() + c <- msg + } - for _, b := range m.messages { - w(m.sendCh, b) - } + for m := range ch { + // TODO: must be more clever. Slow peer makes other peers slow too. + w(m.ch, m.msg) } + }(senderCh) broadcastCh := make(chan broadcastMsg, 8) @@ -356,11 +353,14 @@ func (server *BgpServer) Serve() { } } -func newSenderMsg(peer *Peer, messages []*bgp.BGPMessage) *SenderMsg { +func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg { return &SenderMsg{ - messages: messages, - sendCh: peer.outgoing, - destination: peer.conf.Config.NeighborAddress, + ch: peer.outgoing, + msg: &FsmOutgoingMsg{ + Paths: paths, + Notification: notification, + StayIdle: stayIdle, + }, } } @@ -451,9 +451,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path { } func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg { - msgs := make([]*SenderMsg, 0) - ids := make([]string, 0, len(server.neighborMap)) + msgs := make([]*SenderMsg, 0, len(server.neighborMap)) if peer.isRouteServerClient() { for _, targetPeer := range server.neighborMap { if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { @@ -477,8 +476,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 { - msgList := table.CreateUpdateMsgFromPaths(paths) - msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) } } } @@ -741,22 +739,20 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] } } - msgs := make([]*SenderMsg, 0) + msgs := make([]*SenderMsg, 0, len(server.neighborMap)) for _, targetPeer := range server.neighborMap { if (peer == nil && targetPeer.isRouteServerClient()) || (peer != nil && peer.isRouteServerClient() != targetPeer.isRouteServerClient()) { continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 { - msgList := table.CreateUpdateMsgFromPaths(paths) - msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) } } return msgs, alteredPathList } func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { - msgs := make([]*SenderMsg, 0) - + var msgs []*SenderMsg switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) @@ -779,7 +775,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { drop = peer.configuredRFlist() } peer.DropAll(drop) - msgs = append(msgs, server.dropPeerAllRoutes(peer, drop)...) + msgs = server.dropPeerAllRoutes(peer, drop) } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE { // RFC 4724 4.2 // If the session does not get re-established within the "Restart Time" @@ -787,12 +783,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { // delete all the stale routes from the peer that it is retaining. peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false peer.DropAll(peer.configuredRFlist()) - - msgs = append(msgs, server.dropPeerAllRoutes(peer, peer.configuredRFlist())...) + msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist()) } close(peer.outgoing) - peer.outgoing = make(chan *bgp.BGPMessage, 128) + peer.outgoing = make(chan *FsmOutgoingMsg, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy laddr, _ := peer.fsm.LocalHostPort() @@ -801,7 +796,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { pathList, _ := peer.getBestFromLocal(peer.configuredRFlist()) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList))) + msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)} } } else { // RFC 4724 4.1 @@ -843,13 +838,16 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, oldState) - + case FSM_MSG_ROUTE_REFRESH: + if paths := peer.handleRouteRefresh(e); len(paths) > 0 { + return []*SenderMsg{newSenderMsg(peer, paths, nil, false)} + } case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) + return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)} case *bgp.BGPMessage: - pathList, msgList, eor := peer.handleBGPmessage(e) + pathList, eor := peer.handleUpdate(e) if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] @@ -870,13 +868,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) } - if len(msgList) > 0 { - msgs = append(msgs, newSenderMsg(peer, msgList)) - } - if len(pathList) > 0 { - m, altered := server.propagateUpdate(peer, pathList) - msgs = append(msgs, m...) + var altered []*table.Path + msgs, altered = server.propagateUpdate(peer, pathList) if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() @@ -930,10 +924,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { if !p.isGracefulRestartEnabled() { continue } - pathList, _ := p.getBestFromLocal(p.configuredRFlist()) - if len(pathList) > 0 { - p.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(p, table.CreateUpdateMsgFromPaths(pathList))) + paths, _ := p.getBestFromLocal(p.configuredRFlist()) + if len(paths) > 0 { + p.adjRibOut.Update(paths) + msgs = append(msgs, newSenderMsg(p, paths, nil, false)) } } log.WithFields(log.Fields{ @@ -1970,7 +1964,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { logOp(grpcReq.Name, "Neighbor shutdown") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) for _, peer := range peers { - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + msgs = append(msgs, newSenderMsg(peer, nil, m, false)) } grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) @@ -1981,10 +1975,10 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { break } logOp(grpcReq.Name, "Neighbor reset") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) for _, peer := range peers { peer.fsm.idleHoldTime = peer.conf.Timers.Config.IdleHoldTimeAfterReset - m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + msgs = append(msgs, newSenderMsg(peer, nil, m, false)) } grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) @@ -2063,7 +2057,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { pathList, filtered := peer.getBestFromLocal(families) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList))) + msgs = append(msgs, newSenderMsg(peer, pathList, nil, false)) } if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 { withdrawnList := make([]*table.Path, 0, len(filtered)) @@ -2079,7 +2073,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { withdrawnList = append(withdrawnList, p.Clone(true)) } } - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(withdrawnList))) + msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false)) } } grpcReq.ResponseCh <- &GrpcResponse{} |