diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-14 16:54:53 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-14 16:49:34 +0900 |
commit | 1f5a66a33c485a97ae80688523f526dd945aabc1 (patch) | |
tree | c3e645d2268231ee9c3eb141ca479684165b739b | |
parent | ca2e7730bd8838b1f633e1f1808e0edce9269162 (diff) |
server: replace SenderMsg workaround with InfiniteChannel
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | server/fsm.go | 11 | ||||
-rw-r--r-- | server/peer.go | 19 | ||||
-rw-r--r-- | server/server.go | 197 |
3 files changed, 84 insertions, 143 deletions
diff --git a/server/fsm.go b/server/fsm.go index 3069893b..e0fb53c2 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -360,12 +360,12 @@ type FSMHandler struct { errorCh chan FsmStateReason incoming *channels.InfiniteChannel stateCh chan *FsmMsg - outgoing chan *FsmOutgoingMsg + outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification string } -func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *FsmOutgoingMsg) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing *channels.InfiniteChannel) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), @@ -1077,7 +1077,8 @@ func (h *FSMHandler) sendMessageloop() error { select { case <-h.t.Dying(): return nil - case m := <-h.outgoing: + case o := <-h.outgoing.Out(): + m := o.(*FsmOutgoingMsg) for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths) { if err := send(msg); err != nil { return nil @@ -1163,7 +1164,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) { "State": fsm.state.String(), }).Warn("hold timer expired") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) - h.outgoing <- &FsmOutgoingMsg{Notification: m} + h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED case <-h.holdTimerResetCh: if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { @@ -1175,7 +1176,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) { switch s { case ADMIN_STATE_DOWN: m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) - h.outgoing <- &FsmOutgoingMsg{Notification: m} + h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} } } } diff --git a/server/peer.go b/server/peer.go index 62716e4c..1058f2e9 100644 --- a/server/peer.go +++ b/server/peer.go @@ -38,7 +38,7 @@ type Peer struct { fsm *FSM adjRibIn *table.AdjRib adjRibOut *table.AdjRib - outgoing chan *FsmOutgoingMsg + outgoing *channels.InfiniteChannel policy *table.RoutingPolicy localRib *table.TableManager prefixLimitWarned map[bgp.RouteFamily]bool @@ -46,7 +46,7 @@ type Peer struct { func NewPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, policy *table.RoutingPolicy) *Peer { peer := &Peer{ - outgoing: make(chan *FsmOutgoingMsg, 128), + outgoing: channels.NewInfiniteChannel(), localRib: loc, policy: policy, fsm: NewFSM(g, conf, policy), @@ -282,28 +282,27 @@ func (peer *Peer) doPrefixLimit(k bgp.RouteFamily, c *config.PrefixLimitConfig) } -func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) ([]*SenderMsg, error) { +func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error { x := peer.fsm.pConf.AfiSafis y := c if len(x) != len(y) { - return nil, fmt.Errorf("changing supported afi-safi is not allowed") + return fmt.Errorf("changing supported afi-safi is not allowed") } m := make(map[bgp.RouteFamily]config.PrefixLimitConfig) for _, e := range x { k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName)) if err != nil { - return nil, err + return err } m[k] = e.PrefixLimit.Config } - msgs := make([]*SenderMsg, 0, len(y)) for _, e := range y { k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName)) if err != nil { - return nil, err + return err } if p, ok := m[k]; !ok { - return nil, fmt.Errorf("changing supported afi-safi is not allowed") + return fmt.Errorf("changing supported afi-safi is not allowed") } else if !p.Equal(&e.PrefixLimit.Config) { log.WithFields(log.Fields{ "Topic": "Peer", @@ -316,12 +315,12 @@ func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) ([]*SenderMsg, err }).Warnf("update prefix limit configuration") peer.prefixLimitWarned[k] = false if msg := peer.doPrefixLimit(k, &e.PrefixLimit.Config); msg != nil { - msgs = append(msgs, newSenderMsg(peer, nil, msg, true)) + sendFsmOutgoingMsg(peer, nil, msg, true) } } } peer.fsm.pConf.AfiSafis = c - return msgs, nil + return nil } func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bgp.BGPMessage) { diff --git a/server/server.go b/server/server.go index 539c4086..757b645e 100644 --- a/server/server.go +++ b/server/server.go @@ -37,11 +37,6 @@ import ( var policyMutex sync.RWMutex -type SenderMsg struct { - ch chan *FsmOutgoingMsg - msg *FsmOutgoingMsg -} - type TCPListener struct { l *net.TCPListener ch chan struct{} @@ -140,25 +135,9 @@ func (server *BgpServer) Serve() { w, _ := newGrpcWatcher() server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w) - senderCh := make(chan *SenderMsg, 1<<16) - go func(ch chan *SenderMsg) { - w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) { - // nasty but the peer could already become non established state before here. - defer func() { recover() }() - c <- msg - } - - for m := range ch { - // TODO: must be more clever. Slow peer makes other peers slow too. - w(m.ch, m.msg) - } - - }(senderCh) - server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) - var senderMsgs []*SenderMsg handleFsmMsg := func(e *FsmMsg) { peer, found := server.neighborMap[e.MsgSrc] @@ -170,20 +149,10 @@ func (server *BgpServer) Serve() { log.Debug("FSM Version inconsistent") return } - m := server.handleFSMMessage(peer, e) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } + server.handleFSMMessage(peer, e) } for { - var firstMsg *SenderMsg - var sCh chan *SenderMsg - if len(senderMsgs) > 0 { - sCh = senderCh - firstMsg = senderMsgs[0] - } - passConn := func(conn *net.TCPConn) { host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) ipaddr, _ := net.ResolveIPAddr("ip", host) @@ -231,10 +200,7 @@ func (server *BgpServer) Serve() { select { case grpcReq := <-server.GrpcReqCh: - m := server.handleGrpc(grpcReq) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } + server.handleGrpc(grpcReq) case conn := <-server.acceptCh: passConn(conn) default: @@ -262,25 +228,17 @@ func (server *BgpServer) Serve() { handleFsmMsg(e.(*FsmMsg)) case e := <-server.fsmStateCh: handleFsmMsg(e) - case sCh <- firstMsg: - senderMsgs = senderMsgs[1:] case grpcReq := <-server.GrpcReqCh: - m := server.handleGrpc(grpcReq) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } + server.handleGrpc(grpcReq) } } } -func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg { - return &SenderMsg{ - ch: peer.outgoing, - msg: &FsmOutgoingMsg{ - Paths: paths, - Notification: notification, - StayIdle: stayIdle, - }, +func sendFsmOutgoingMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) { + peer.outgoing.In() <- &FsmOutgoingMsg{ + Paths: paths, + Notification: notification, + StayIdle: stayIdle, } } @@ -387,9 +345,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path { return path } -func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg { +func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) { ids := make([]string, 0, len(server.neighborMap)) - msgs := make([]*SenderMsg, 0, len(server.neighborMap)) if peer.isRouteServerClient() { for _, targetPeer := range server.neighborMap { if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { @@ -412,11 +369,10 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()], nil); len(paths) > 0 { - msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) + sendFsmOutgoingMsg(targetPeer, paths, nil, false) } } } - return msgs } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { @@ -470,11 +426,10 @@ func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*ta return moded } -func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]*SenderMsg, []*table.Path) { +func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*table.Path { rib := server.globalRib var alteredPathList, withdrawn []*table.Path var best map[string][]*table.Path - msgs := make([]*SenderMsg, 0, len(server.neighborMap)) if peer != nil && peer.isRouteServerClient() { for _, path := range pathList { @@ -545,14 +500,14 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] } else { paths = peer.processOutgoingPaths(paths, nil) } - msgs = append(msgs, newSenderMsg(peer, paths, nil, false)) + sendFsmOutgoingMsg(peer, paths, nil, false) } } alteredPathList = pathList var multi [][]*table.Path best, withdrawn, multi = rib.ProcessPaths([]string{table.GLOBAL_RIB_NAME}, pathList) if len(best[table.GLOBAL_RIB_NAME]) == 0 { - return nil, alteredPathList + return alteredPathList } server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) } @@ -562,14 +517,13 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] continue } if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()], withdrawn); len(paths) > 0 { - msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false)) + sendFsmOutgoingMsg(targetPeer, paths, nil, false) } } - return msgs, alteredPathList + return alteredPathList } -func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { - var msgs []*SenderMsg +func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) @@ -593,7 +547,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool) peer.DropAll(drop) - msgs = server.dropPeerAllRoutes(peer, drop) + server.dropPeerAllRoutes(peer, drop) } else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE { // RFC 4724 4.2 // If the session does not get re-established within the "Restart Time" @@ -601,11 +555,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { // delete all the stale routes from the peer that it is retaining. peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false peer.DropAll(peer.configuredRFlist()) - msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist()) + server.dropPeerAllRoutes(peer, peer.configuredRFlist()) } - close(peer.outgoing) - peer.outgoing = make(chan *FsmOutgoingMsg, 128) + peer.outgoing.Close() + peer.outgoing = channels.NewInfiniteChannel() if nextState == bgp.BGP_FSM_ESTABLISHED { // update for export policy laddr, _ := peer.fsm.LocalHostPort() @@ -643,7 +597,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)} + sendFsmOutgoingMsg(peer, pathList, nil, false) } } else { // RFC 4724 4.1 @@ -683,17 +637,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { server.broadcastPeerState(peer, oldState) case FSM_MSG_ROUTE_REFRESH: if paths := peer.handleRouteRefresh(e); len(paths) > 0 { - return []*SenderMsg{newSenderMsg(peer, paths, nil, false)} + sendFsmOutgoingMsg(peer, paths, nil, false) + return } case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: - return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)} + sendFsmOutgoingMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false) + return case *bgp.BGPMessage: server.roaManager.validate(e.PathList) pathList, eor, notification := peer.handleUpdate(e) if notification != nil { - return []*SenderMsg{newSenderMsg(peer, nil, notification, true)} + sendFsmOutgoingMsg(peer, nil, notification, true) + return } if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] @@ -716,7 +673,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { if len(pathList) > 0 { var altered []*table.Path - msgs, altered = server.propagateUpdate(peer, pathList) + altered = server.propagateUpdate(peer, pathList) if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() @@ -777,7 +734,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { paths, _ := p.getBestFromLocal(p.configuredRFlist()) if len(paths) > 0 { p.adjRibOut.Update(paths) - msgs = append(msgs, newSenderMsg(p, paths, nil, false)) + sendFsmOutgoingMsg(p, paths, nil, false) } } log.WithFields(log.Fields{ @@ -797,8 +754,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { "Topic": "Peer", "Key": peer.fsm.pConf.Config.NeighborAddress, }).Debugf("withdraw %d stale routes", len(pathList)) - m, _ := server.propagateUpdate(peer, pathList) - msgs = append(msgs, m...) + server.propagateUpdate(peer, pathList) } // we don't delay non-route-target NLRIs when peer is restarting @@ -820,7 +776,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { } if paths, _ := peer.getBestFromLocal(families); len(paths) > 0 { peer.adjRibOut.Update(paths) - msgs = append(msgs, newSenderMsg(peer, paths, nil, false)) + sendFsmOutgoingMsg(peer, paths, nil, false) } } } @@ -832,7 +788,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { }).Panic("unknown msg type") } } - return msgs + return } func (server *BgpServer) SetGlobalType(g config.Global) error { @@ -1493,7 +1449,7 @@ func (server *BgpServer) handleModConfig(grpcReq *GrpcRequest) error { c = arg case *api.StopServerRequest: for k, _ := range server.neighborMap { - _, err := server.handleDeleteNeighborRequest(&GrpcRequest{ + err := server.handleDeleteNeighborRequest(&GrpcRequest{ Data: &api.DeleteNeighborRequest{ Peer: &api.Peer{ Conf: &api.PeerConf{ @@ -1554,9 +1510,7 @@ func sendMultipleResponses(grpcReq *GrpcRequest, results []*GrpcResponse) { } } -func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { - var msgs []*SenderMsg - +func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { logOp := func(addr string, action string) { log.WithFields(log.Fields{ "Topic": "Operation", @@ -1581,7 +1535,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { ResponseErr: fmt.Errorf("bgpd main loop is not started yet"), } close(grpcReq.ResponseCh) - return nil + return } var err error @@ -1709,12 +1663,12 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_ADD_PATH: pathList := server.handleAddPathRequest(grpcReq) if len(pathList) > 0 { - msgs, _ = server.propagateUpdate(nil, pathList) + server.propagateUpdate(nil, pathList) } case REQ_DELETE_PATH: pathList := server.handleDeletePathRequest(grpcReq) if len(pathList) > 0 { - msgs, _ = server.propagateUpdate(nil, pathList) + server.propagateUpdate(nil, pathList) } case REQ_BMP_NEIGHBORS: //TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS @@ -1838,7 +1792,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { logOp(grpcReq.Name, "Neighbor shutdown") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) for _, peer := range peers { - msgs = append(msgs, newSenderMsg(peer, nil, m, false)) + sendFsmOutgoingMsg(peer, nil, m, false) } grpcReq.ResponseCh <- &GrpcResponse{Data: &api.ShutdownNeighborResponse{}} close(grpcReq.ResponseCh) @@ -1852,7 +1806,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) for _, peer := range peers { peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset - msgs = append(msgs, newSenderMsg(peer, nil, m, false)) + sendFsmOutgoingMsg(peer, nil, m, false) } grpcReq.ResponseCh <- &GrpcResponse{Data: &api.ResetNeighborResponse{}} close(grpcReq.ResponseCh) @@ -1897,8 +1851,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { } } peer.adjRibIn.RefreshAcceptedNumber(families) - m, _ := server.propagateUpdate(peer, pathList) - msgs = append(msgs, m...) + server.propagateUpdate(peer, pathList) } if grpcReq.RequestType == REQ_NEIGHBOR_SOFT_RESET_IN { @@ -1949,7 +1902,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { pathList, filtered := peer.getBestFromLocal(families) if len(pathList) > 0 { peer.adjRibOut.Update(pathList) - msgs = append(msgs, newSenderMsg(peer, pathList, nil, false)) + sendFsmOutgoingMsg(peer, pathList, nil, false) } if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 { withdrawnList := make([]*table.Path, 0, len(filtered)) @@ -1965,7 +1918,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { withdrawnList = append(withdrawnList, p.Clone(true)) } } - msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false)) + sendFsmOutgoingMsg(peer, withdrawnList, nil, false) } } grpcReq.ResponseCh <- &GrpcResponse{Data: &api.SoftResetNeighborResponse{}} @@ -2005,46 +1958,37 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) case REQ_GRPC_ADD_NEIGHBOR: - _, err := server.handleAddNeighborRequest(grpcReq) + err := server.handleAddNeighborRequest(grpcReq) grpcReq.ResponseCh <- &GrpcResponse{ Data: &api.AddNeighborResponse{}, ResponseErr: err, } close(grpcReq.ResponseCh) case REQ_GRPC_DELETE_NEIGHBOR: - m, err := server.handleDeleteNeighborRequest(grpcReq) + err := server.handleDeleteNeighborRequest(grpcReq) grpcReq.ResponseCh <- &GrpcResponse{ Data: &api.DeleteNeighborResponse{}, ResponseErr: err, } - if len(m) > 0 { - msgs = append(msgs, m...) - } close(grpcReq.ResponseCh) case REQ_ADD_NEIGHBOR: - _, err := server.handleAddNeighbor(grpcReq.Data.(*config.Neighbor)) + err := server.handleAddNeighbor(grpcReq.Data.(*config.Neighbor)) grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, } close(grpcReq.ResponseCh) case REQ_DEL_NEIGHBOR: - m, err := server.handleDelNeighbor(grpcReq.Data.(*config.Neighbor), bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED) + err := server.handleDelNeighbor(grpcReq.Data.(*config.Neighbor), bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED) grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, } - if len(m) > 0 { - msgs = append(msgs, m...) - } close(grpcReq.ResponseCh) case REQ_UPDATE_NEIGHBOR: - m, policyUpdated, err := server.handleUpdateNeighbor(grpcReq.Data.(*config.Neighbor)) + policyUpdated, err := server.handleUpdateNeighbor(grpcReq.Data.(*config.Neighbor)) grpcReq.ResponseCh <- &GrpcResponse{ Data: policyUpdated, ResponseErr: err, } - if len(m) > 0 { - msgs = append(msgs, m...) - } close(grpcReq.ResponseCh) case REQ_GET_DEFINED_SET: rsp, err := server.handleGrpcGetDefinedSet(grpcReq) @@ -2175,7 +2119,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_INJECT_MRT: pathList := server.handleInjectMrtRequest(grpcReq) if len(pathList) > 0 { - msgs, _ = server.propagateUpdate(nil, pathList) + server.propagateUpdate(nil, pathList) grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) } @@ -2197,7 +2141,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { case REQ_VRF, REQ_GET_VRF, REQ_ADD_VRF, REQ_DELETE_VRF: pathList := server.handleVrfRequest(grpcReq) if len(pathList) > 0 { - msgs, _ = server.propagateUpdate(nil, pathList) + server.propagateUpdate(nil, pathList) } case REQ_RELOAD_POLICY: err := server.handlePolicy(grpcReq.Data.(config.RoutingPolicy)) @@ -2242,13 +2186,13 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR } - return msgs + return ERROR: grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, } close(grpcReq.ResponseCh) - return msgs + return } func (server *BgpServer) handleGrpcGetDefinedSet(grpcReq *GrpcRequest) (*api.GetDefinedSetResponse, error) { @@ -2265,10 +2209,10 @@ func (server *BgpServer) handleGrpcGetDefinedSet(grpcReq *GrpcRequest) (*api.Get return &api.GetDefinedSetResponse{Sets: sets}, nil } -func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) ([]*SenderMsg, error) { +func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) error { addr := c.Config.NeighborAddress if _, y := server.neighborMap[addr]; y { - return nil, fmt.Errorf("Can't overwrite the exising peer: %s", addr) + return fmt.Errorf("Can't overwrite the exising peer: %s", addr) } if server.bgpConfig.Global.Config.Port > 0 { @@ -2297,14 +2241,14 @@ func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) ([]*SenderMsg, er server.neighborMap[addr] = peer peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) - return nil, nil + return nil } -func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uint8) ([]*SenderMsg, error) { +func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uint8) error { addr := c.Config.NeighborAddress n, y := server.neighborMap[addr] if !y { - return nil, fmt.Errorf("Can't delete a peer configuration for %s", addr) + return fmt.Errorf("Can't delete a peer configuration for %s", addr) } for _, l := range server.Listeners(addr) { SetTcpMD5SigSockopts(l, addr, "") @@ -2324,11 +2268,11 @@ func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uin t.Stop() }(addr) delete(server.neighborMap, addr) - m := server.dropPeerAllRoutes(n, n.configuredRFlist()) - return m, nil + server.dropPeerAllRoutes(n, n.configuredRFlist()) + return nil } -func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg, bool, error) { +func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) (bool, error) { addr := c.Config.NeighborAddress peer := server.neighborMap[addr] policyUpdated := false @@ -2360,23 +2304,21 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg, } else if original.Config.PeerAs != c.Config.PeerAs { sub = bgp.BGP_ERROR_SUB_PEER_DECONFIGURED } - msgs, err := server.handleDelNeighbor(peer.fsm.pConf, bgp.BGP_ERROR_CEASE, sub) - if err != nil { + if err := server.handleDelNeighbor(peer.fsm.pConf, bgp.BGP_ERROR_CEASE, sub); err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": addr, }).Error(err) - return msgs, policyUpdated, err + return policyUpdated, err } - msgs2, err := server.handleAddNeighbor(c) - msgs = append(msgs, msgs2...) + err := server.handleAddNeighbor(c) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": addr, }).Error(err) } - return msgs, policyUpdated, err + return policyUpdated, err } if !original.Timers.Config.Equal(&c.Timers.Config) { @@ -2387,7 +2329,7 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg, peer.fsm.pConf.Timers.Config = c.Timers.Config } - msgs, err := peer.updatePrefixLimitConfig(c.AfiSafis) + err := peer.updatePrefixLimitConfig(c.AfiSafis) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", @@ -2395,15 +2337,14 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg, }).Error(err) // rollback to original state peer.fsm.pConf = original - return nil, policyUpdated, err } - return msgs, policyUpdated, nil + return policyUpdated, err } -func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) ([]*SenderMsg, error) { +func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) error { arg, ok := grpcReq.Data.(*api.AddNeighborRequest) if !ok { - return []*SenderMsg{}, fmt.Errorf("AddNeighborRequest type assertion failed") + return fmt.Errorf("AddNeighborRequest type assertion failed") } else { apitoConfig := func(a *api.Peer) (*config.Neighbor, error) { pconf := &config.Neighbor{} @@ -2517,13 +2458,13 @@ func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) ([]*Send } c, err := apitoConfig(arg.Peer) if err != nil { - return nil, err + return err } return server.handleAddNeighbor(c) } } -func (server *BgpServer) handleDeleteNeighborRequest(grpcReq *GrpcRequest) ([]*SenderMsg, error) { +func (server *BgpServer) handleDeleteNeighborRequest(grpcReq *GrpcRequest) error { arg := grpcReq.Data.(*api.DeleteNeighborRequest) return server.handleDelNeighbor(&config.Neighbor{ Config: config.NeighborConfig{ |