diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-04-06 22:44:54 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-04-10 00:04:48 +0900 |
commit | 0d2633e7831cb2406bacf62a232fcca7184ff988 (patch) | |
tree | 3b3e32f5106e94b100af2f802142b5e1ad69d922 /server | |
parent | 3ae8c9170b399ed9b64fe5382fc9a58c5c9f8b6b (diff) |
server: serialize in fsm send loop
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/fsm.go | 38 | ||||
-rw-r--r-- | server/fsm_test.go | 2 | ||||
-rw-r--r-- | server/peer.go | 106 | ||||
-rw-r--r-- | server/server.go | 94 |
4 files changed, 126 insertions, 114 deletions
diff --git a/server/fsm.go b/server/fsm.go index 68e27074..48f4e095 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -81,6 +81,7 @@ const ( _ FsmMsgType = iota FSM_MSG_STATE_CHANGE FSM_MSG_BGP_MESSAGE + FSM_MSG_ROUTE_REFRESH ) type FsmMsg struct { @@ -92,6 +93,12 @@ type FsmMsg struct { payload []byte } +type FsmOutgoingMsg struct { + Paths []*table.Path + Notification *bgp.BGPMessage + StayIdle bool +} + const ( HOLDTIME_OPENSENT = 240 HOLDTIME_IDLE = 5 @@ -359,11 +366,11 @@ type FSMHandler struct { errorCh chan FsmStateReason incoming *channels.InfiniteChannel stateCh chan *FsmMsg - outgoing chan *bgp.BGPMessage + outgoing chan *FsmOutgoingMsg holdTimerResetCh chan bool } -func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *FsmOutgoingMsg) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), @@ -612,6 +619,8 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { fmsg.MsgData = m if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { switch m.Header.Type { + case bgp.BGP_MSG_ROUTE_REFRESH: + fmsg.MsgType = FSM_MSG_ROUTE_REFRESH case bgp.BGP_MSG_UPDATE: body := m.Body.(*bgp.BGPUpdate) confedCheck := !config.IsConfederationMember(h.fsm.gConf, h.fsm.pConf) && config.IsEBGPPeer(h.fsm.gConf, h.fsm.pConf) @@ -1071,14 +1080,26 @@ func (h *FSMHandler) sendMessageloop() error { } return nil case m := <-h.outgoing: - if err := send(m); err != nil { - return nil + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths) { + if err := send(msg); err != nil { + return nil + } + } + if m.Notification != nil { + if err := send(m.Notification); err != nil { + return nil + } + if m.StayIdle { + select { + case h.fsm.adminStateCh <- ADMIN_STATE_DOWN: + default: + } + } } case <-ticker.C: if err := send(bgp.NewBGPKeepAliveMessage()); err != nil { return nil } - } } } @@ -1146,7 +1167,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) { "data": bgp.BGP_FSM_ESTABLISHED, }).Warn("hold timer expired") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) - h.outgoing <- m + h.outgoing <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED case <-h.holdTimerResetCh: if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { @@ -1157,9 +1178,8 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) { if err == nil { switch s { case ADMIN_STATE_DOWN: - m := bgp.NewBGPNotificationMessage( - bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) - h.outgoing <- m + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) + h.outgoing <- &FsmOutgoingMsg{Notification: m} } } } diff --git a/server/fsm_test.go b/server/fsm_test.go index 8d98d751..afe66a0f 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -294,7 +294,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy()) - p.outgoing = make(chan *bgp.BGPMessage, 4096) + p.outgoing = make(chan *FsmOutgoingMsg, 4096) h := &FSMHandler{ fsm: p.fsm, diff --git a/server/peer.go b/server/peer.go index f5491b33..950526dd 100644 --- a/server/peer.go +++ b/server/peer.go @@ -39,7 +39,7 @@ type Peer struct { fsm *FSM adjRibIn *table.AdjRib adjRibOut *table.AdjRib - outgoing chan *bgp.BGPMessage + outgoing chan *FsmOutgoingMsg policy *table.RoutingPolicy localRib *table.TableManager } @@ -48,7 +48,7 @@ func NewPeer(g config.Global, conf config.Neighbor, loc *table.TableManager, pol peer := &Peer{ gConf: g, conf: conf, - outgoing: make(chan *bgp.BGPMessage, 128), + outgoing: make(chan *FsmOutgoingMsg, 128), localRib: loc, policy: policy, } @@ -197,68 +197,66 @@ func (peer *Peer) processOutgoingPaths(paths []*table.Path) []*table.Path { return outgoing } -func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage, []bgp.RouteFamily) { +func (peer *Peer) handleRouteRefresh(e *FsmMsg) []*table.Path { + m := e.MsgData.(*bgp.BGPMessage) + rr := m.Body.(*bgp.BGPRouteRefresh) + rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI) + if _, ok := peer.fsm.rfMap[rf]; !ok { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.conf.Config.NeighborAddress, + "Data": rf, + }).Warn("Route family isn't supported") + return nil + } + if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.conf.Config.NeighborAddress, + }).Warn("ROUTE_REFRESH received but the capability wasn't advertised") + return nil + } + rfList := []bgp.RouteFamily{rf} + peer.adjRibOut.Drop(rfList) + accepted, filtered := peer.getBestFromLocal(rfList) + peer.adjRibOut.Update(accepted) + for _, path := range filtered { + path.IsWithdraw = true + accepted = append(accepted, path) + } + return accepted +} + +func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily) { m := e.MsgData.(*bgp.BGPMessage) log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.conf.Config.NeighborAddress, "data": m, }).Debug("received") - eor := []bgp.RouteFamily{} - - switch m.Header.Type { - case bgp.BGP_MSG_ROUTE_REFRESH: - rr := m.Body.(*bgp.BGPRouteRefresh) - rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI) - if _, ok := peer.fsm.rfMap[rf]; !ok { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.conf.Config.NeighborAddress, - "Data": rf, - }).Warn("Route family isn't supported") - break - } - if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; ok { - rfList := []bgp.RouteFamily{rf} - peer.adjRibOut.Drop(rfList) - accepted, filtered := peer.getBestFromLocal(rfList) - peer.adjRibOut.Update(accepted) - for _, path := range filtered { - path.IsWithdraw = true - accepted = append(accepted, path) + peer.conf.Timers.State.UpdateRecvTime = time.Now().Unix() + if len(e.PathList) > 0 { + peer.adjRibIn.Update(e.PathList) + paths := make([]*table.Path, 0, len(e.PathList)) + eor := []bgp.RouteFamily{} + for _, path := range e.PathList { + if path.IsEOR() { + family := path.GetRouteFamily() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.conf.Config.NeighborAddress, + "AddressFamily": family, + }).Debug("EOR received") + eor = append(eor, family) + continue } - return nil, table.CreateUpdateMsgFromPaths(accepted), eor - } else { - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.conf.Config.NeighborAddress, - }).Warn("ROUTE_REFRESH received but the capability wasn't advertised") - } - - case bgp.BGP_MSG_UPDATE: - peer.conf.Timers.State.UpdateRecvTime = time.Now().Unix() - if len(e.PathList) > 0 { - peer.adjRibIn.Update(e.PathList) - paths := make([]*table.Path, 0, len(e.PathList)) - for _, path := range e.PathList { - if path.IsEOR() { - family := path.GetRouteFamily() - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.conf.Config.NeighborAddress, - "AddressFamily": family, - }).Debug("EOR received") - eor = append(eor, family) - continue - } - if path.Filtered(peer.ID()) != table.POLICY_DIRECTION_IN { - paths = append(paths, path) - } + if path.Filtered(peer.ID()) != table.POLICY_DIRECTION_IN { + paths = append(paths, path) } - return paths, nil, eor } + return paths, eor } - return nil, nil, eor + return nil, nil } func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) { diff --git a/server/server.go b/server/server.go index 7f84d5db..af88f3ca 100644 --- a/server/server.go +++ b/server/server.go @@ -38,9 +38,8 @@ import ( var policyMutex sync.RWMutex type SenderMsg struct { - messages []*bgp.BGPMessage - sendCh chan *bgp.BGPMessage - destination string + ch chan *FsmOutgoingMsg + msg *FsmOutgoingMsg } type broadcastMsg interface { @@ -206,19 +205,17 @@ func (server *BgpServer) Serve() { senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { - for { - // TODO: must be more clever. Slow peer makes other peers slow too. - m := <-ch - w := func(c chan *bgp.BGPMessage, msg *bgp.BGPMessage) { - // nasty but the peer could already become non established state before here. - defer func() { recover() }() - c <- msg - } + w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) { + // nasty but the peer could already become non established state before here. + defer func() { recover() }() + c <- msg + } - for _, b := range m.messages { - w(m.sendCh, b) - } + for m := range ch { + // TODO: must be more clever. Slow peer makes other peers slow too. + w(m.ch, m.msg) } + }(senderCh) broadcastCh := make(chan broadcastMsg, 8) @@ -356,11 +353,14 @@ func (server *BgpServer) Serve() { } } -func newSenderMsg(peer *Peer, messages []*bgp.BGPMessage) *SenderMsg { +func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg { return &SenderMsg{ - messages: messages, - sendCh: peer.outgoing, - destination: peer.conf.Config.NeighborAddress, + ch: peer.outgoing, + msg: &FsmOutgoingMsg{ + Paths: paths, + Notification: notification, + StayIdle: stayIdle, + }, } } @@ -451,9 +451,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path { } func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg { - msgs := make([]*SenderMsg, 0) - ids := make([]string, 0, len(server.neighborMap)) + msgs := make([]*SenderMsg, 0, len(server.neighborMap)) if peer.isRouteServerClient() { for _, targetPeer := range server.neighborMap { if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { @@ -477,8 +476,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 { - msgList := table.CreateUpdateMsgFromPaths(paths) - msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) } } } @@ -741,22 +739,20 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] } } - msgs := make([]*SenderMsg, 0) + msgs := make([]*SenderMsg, 0, len(server.neighborMap)) for _, targetPeer := range server.neighborMap { if (peer == nil && targetPeer.isRouteServerClient()) || (peer != nil && peer.isRouteServerClient() != targetPeer.isRouteServerClient()) { continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 { - msgList := table.CreateUpdateMsgFromPaths(paths) - msgs = append(msgs, newSenderMsg(targetPeer, msgList)) + msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) } } return msgs, alteredPathList } func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { - msgs := make([]*SenderMsg, 0) - + var msgs []*SenderMsg switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) @@ -779,7 +775,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { drop = peer.configuredRFlist() } peer.DropAll(drop) - msgs = append(msgs, server.dropPeerAllRoutes(peer, drop)...) + msgs = server.dropPeerAllRoutes(peer, drop) } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE { // RFC 4724 4.2 // If the session does not get re-established within the "Restart Time" @@ -787,12 +783,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { // delete all the stale routes from the peer that it is retaining. peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false peer.DropAll(peer.configuredRFlist()) - - msgs = append(msgs, server.dropPeerAllRoutes(peer, peer.configuredRFlist())...) + msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist()) } close(peer.outgoing) - peer.outgoing = make(chan *bgp.BGPMessage, 128) + peer.outgoing = make(chan *FsmOutgoingMsg, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy laddr, _ := peer.fsm.LocalHostPort() @@ -801,7 +796,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { pathList, _ := peer.getBestFromLocal(peer.configuredRFlist()) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList))) + msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)} } } else { // RFC 4724 4.1 @@ -843,13 +838,16 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, oldState) - + case FSM_MSG_ROUTE_REFRESH: + if paths := peer.handleRouteRefresh(e); len(paths) > 0 { + return []*SenderMsg{newSenderMsg(peer, paths, nil, false)} + } case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) + return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)} case *bgp.BGPMessage: - pathList, msgList, eor := peer.handleBGPmessage(e) + pathList, eor := peer.handleUpdate(e) if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] @@ -870,13 +868,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) } - if len(msgList) > 0 { - msgs = append(msgs, newSenderMsg(peer, msgList)) - } - if len(pathList) > 0 { - m, altered := server.propagateUpdate(peer, pathList) - msgs = append(msgs, m...) + var altered []*table.Path + msgs, altered = server.propagateUpdate(peer, pathList) if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() @@ -930,10 +924,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { if !p.isGracefulRestartEnabled() { continue } - pathList, _ := p.getBestFromLocal(p.configuredRFlist()) - if len(pathList) > 0 { - p.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(p, table.CreateUpdateMsgFromPaths(pathList))) + paths, _ := p.getBestFromLocal(p.configuredRFlist()) + if len(paths) > 0 { + p.adjRibOut.Update(paths) + msgs = append(msgs, newSenderMsg(p, paths, nil, false)) } } log.WithFields(log.Fields{ @@ -1970,7 +1964,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { logOp(grpcReq.Name, "Neighbor shutdown") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) for _, peer := range peers { - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + msgs = append(msgs, newSenderMsg(peer, nil, m, false)) } grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) @@ -1981,10 +1975,10 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { break } logOp(grpcReq.Name, "Neighbor reset") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) for _, peer := range peers { peer.fsm.idleHoldTime = peer.conf.Timers.Config.IdleHoldTimeAfterReset - m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) - msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m})) + msgs = append(msgs, newSenderMsg(peer, nil, m, false)) } grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) @@ -2063,7 +2057,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { pathList, filtered := peer.getBestFromLocal(families) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList))) + msgs = append(msgs, newSenderMsg(peer, pathList, nil, false)) } if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 { withdrawnList := make([]*table.Path, 0, len(filtered)) @@ -2079,7 +2073,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { withdrawnList = append(withdrawnList, p.Clone(true)) } } - msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(withdrawnList))) + msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false)) } } grpcReq.ResponseCh <- &GrpcResponse{} |