summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-14 16:54:53 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-14 16:49:34 +0900
commit1f5a66a33c485a97ae80688523f526dd945aabc1 (patch)
treec3e645d2268231ee9c3eb141ca479684165b739b
parentca2e7730bd8838b1f633e1f1808e0edce9269162 (diff)
server: replace SenderMsg workaround with InfiniteChannel
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/fsm.go11
-rw-r--r--server/peer.go19
-rw-r--r--server/server.go197
3 files changed, 84 insertions, 143 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 3069893b..e0fb53c2 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -360,12 +360,12 @@ type FSMHandler struct {
errorCh chan FsmStateReason
incoming *channels.InfiniteChannel
stateCh chan *FsmMsg
- outgoing chan *FsmOutgoingMsg
+ outgoing *channels.InfiniteChannel
holdTimerResetCh chan bool
sentNotification string
}
-func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *FsmOutgoingMsg) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing *channels.InfiniteChannel) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan FsmStateReason, 2),
@@ -1077,7 +1077,8 @@ func (h *FSMHandler) sendMessageloop() error {
select {
case <-h.t.Dying():
return nil
- case m := <-h.outgoing:
+ case o := <-h.outgoing.Out():
+ m := o.(*FsmOutgoingMsg)
for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths) {
if err := send(msg); err != nil {
return nil
@@ -1163,7 +1164,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) {
"State": fsm.state.String(),
}).Warn("hold timer expired")
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil)
- h.outgoing <- &FsmOutgoingMsg{Notification: m}
+ h.outgoing.In() <- &FsmOutgoingMsg{Notification: m}
return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED
case <-h.holdTimerResetCh:
if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 {
@@ -1175,7 +1176,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) {
switch s {
case ADMIN_STATE_DOWN:
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
- h.outgoing <- &FsmOutgoingMsg{Notification: m}
+ h.outgoing.In() <- &FsmOutgoingMsg{Notification: m}
}
}
}
diff --git a/server/peer.go b/server/peer.go
index 62716e4c..1058f2e9 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -38,7 +38,7 @@ type Peer struct {
fsm *FSM
adjRibIn *table.AdjRib
adjRibOut *table.AdjRib
- outgoing chan *FsmOutgoingMsg
+ outgoing *channels.InfiniteChannel
policy *table.RoutingPolicy
localRib *table.TableManager
prefixLimitWarned map[bgp.RouteFamily]bool
@@ -46,7 +46,7 @@ type Peer struct {
func NewPeer(g *config.Global, conf *config.Neighbor, loc *table.TableManager, policy *table.RoutingPolicy) *Peer {
peer := &Peer{
- outgoing: make(chan *FsmOutgoingMsg, 128),
+ outgoing: channels.NewInfiniteChannel(),
localRib: loc,
policy: policy,
fsm: NewFSM(g, conf, policy),
@@ -282,28 +282,27 @@ func (peer *Peer) doPrefixLimit(k bgp.RouteFamily, c *config.PrefixLimitConfig)
}
-func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) ([]*SenderMsg, error) {
+func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) error {
x := peer.fsm.pConf.AfiSafis
y := c
if len(x) != len(y) {
- return nil, fmt.Errorf("changing supported afi-safi is not allowed")
+ return fmt.Errorf("changing supported afi-safi is not allowed")
}
m := make(map[bgp.RouteFamily]config.PrefixLimitConfig)
for _, e := range x {
k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName))
if err != nil {
- return nil, err
+ return err
}
m[k] = e.PrefixLimit.Config
}
- msgs := make([]*SenderMsg, 0, len(y))
for _, e := range y {
k, err := bgp.GetRouteFamily(string(e.Config.AfiSafiName))
if err != nil {
- return nil, err
+ return err
}
if p, ok := m[k]; !ok {
- return nil, fmt.Errorf("changing supported afi-safi is not allowed")
+ return fmt.Errorf("changing supported afi-safi is not allowed")
} else if !p.Equal(&e.PrefixLimit.Config) {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -316,12 +315,12 @@ func (peer *Peer) updatePrefixLimitConfig(c []config.AfiSafi) ([]*SenderMsg, err
}).Warnf("update prefix limit configuration")
peer.prefixLimitWarned[k] = false
if msg := peer.doPrefixLimit(k, &e.PrefixLimit.Config); msg != nil {
- msgs = append(msgs, newSenderMsg(peer, nil, msg, true))
+ sendFsmOutgoingMsg(peer, nil, msg, true)
}
}
}
peer.fsm.pConf.AfiSafis = c
- return msgs, nil
+ return nil
}
func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily, *bgp.BGPMessage) {
diff --git a/server/server.go b/server/server.go
index 539c4086..757b645e 100644
--- a/server/server.go
+++ b/server/server.go
@@ -37,11 +37,6 @@ import (
var policyMutex sync.RWMutex
-type SenderMsg struct {
- ch chan *FsmOutgoingMsg
- msg *FsmOutgoingMsg
-}
-
type TCPListener struct {
l *net.TCPListener
ch chan struct{}
@@ -140,25 +135,9 @@ func (server *BgpServer) Serve() {
w, _ := newGrpcWatcher()
server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w)
- senderCh := make(chan *SenderMsg, 1<<16)
- go func(ch chan *SenderMsg) {
- w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) {
- // nasty but the peer could already become non established state before here.
- defer func() { recover() }()
- c <- msg
- }
-
- for m := range ch {
- // TODO: must be more clever. Slow peer makes other peers slow too.
- w(m.ch, m.msg)
- }
-
- }(senderCh)
-
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
- var senderMsgs []*SenderMsg
handleFsmMsg := func(e *FsmMsg) {
peer, found := server.neighborMap[e.MsgSrc]
@@ -170,20 +149,10 @@ func (server *BgpServer) Serve() {
log.Debug("FSM Version inconsistent")
return
}
- m := server.handleFSMMessage(peer, e)
- if len(m) > 0 {
- senderMsgs = append(senderMsgs, m...)
- }
+ server.handleFSMMessage(peer, e)
}
for {
- var firstMsg *SenderMsg
- var sCh chan *SenderMsg
- if len(senderMsgs) > 0 {
- sCh = senderCh
- firstMsg = senderMsgs[0]
- }
-
passConn := func(conn *net.TCPConn) {
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
ipaddr, _ := net.ResolveIPAddr("ip", host)
@@ -231,10 +200,7 @@ func (server *BgpServer) Serve() {
select {
case grpcReq := <-server.GrpcReqCh:
- m := server.handleGrpc(grpcReq)
- if len(m) > 0 {
- senderMsgs = append(senderMsgs, m...)
- }
+ server.handleGrpc(grpcReq)
case conn := <-server.acceptCh:
passConn(conn)
default:
@@ -262,25 +228,17 @@ func (server *BgpServer) Serve() {
handleFsmMsg(e.(*FsmMsg))
case e := <-server.fsmStateCh:
handleFsmMsg(e)
- case sCh <- firstMsg:
- senderMsgs = senderMsgs[1:]
case grpcReq := <-server.GrpcReqCh:
- m := server.handleGrpc(grpcReq)
- if len(m) > 0 {
- senderMsgs = append(senderMsgs, m...)
- }
+ server.handleGrpc(grpcReq)
}
}
}
-func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg {
- return &SenderMsg{
- ch: peer.outgoing,
- msg: &FsmOutgoingMsg{
- Paths: paths,
- Notification: notification,
- StayIdle: stayIdle,
- },
+func sendFsmOutgoingMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) {
+ peer.outgoing.In() <- &FsmOutgoingMsg{
+ Paths: paths,
+ Notification: notification,
+ StayIdle: stayIdle,
}
}
@@ -387,9 +345,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path {
return path
}
-func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg {
+func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) {
ids := make([]string, 0, len(server.neighborMap))
- msgs := make([]*SenderMsg, 0, len(server.neighborMap))
if peer.isRouteServerClient() {
for _, targetPeer := range server.neighborMap {
if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
@@ -412,11 +369,10 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()], nil); len(paths) > 0 {
- msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
+ sendFsmOutgoingMsg(targetPeer, paths, nil, false)
}
}
}
- return msgs
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
@@ -470,11 +426,10 @@ func (server *BgpServer) RSimportPaths(peer *Peer, pathList []*table.Path) []*ta
return moded
}
-func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]*SenderMsg, []*table.Path) {
+func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*table.Path {
rib := server.globalRib
var alteredPathList, withdrawn []*table.Path
var best map[string][]*table.Path
- msgs := make([]*SenderMsg, 0, len(server.neighborMap))
if peer != nil && peer.isRouteServerClient() {
for _, path := range pathList {
@@ -545,14 +500,14 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
} else {
paths = peer.processOutgoingPaths(paths, nil)
}
- msgs = append(msgs, newSenderMsg(peer, paths, nil, false))
+ sendFsmOutgoingMsg(peer, paths, nil, false)
}
}
alteredPathList = pathList
var multi [][]*table.Path
best, withdrawn, multi = rib.ProcessPaths([]string{table.GLOBAL_RIB_NAME}, pathList)
if len(best[table.GLOBAL_RIB_NAME]) == 0 {
- return nil, alteredPathList
+ return alteredPathList
}
server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
}
@@ -562,14 +517,13 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()], withdrawn); len(paths) > 0 {
- msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
+ sendFsmOutgoingMsg(targetPeer, paths, nil, false)
}
}
- return msgs, alteredPathList
+ return alteredPathList
}
-func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
- var msgs []*SenderMsg
+func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
@@ -593,7 +547,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
peer.prefixLimitWarned = make(map[bgp.RouteFamily]bool)
peer.DropAll(drop)
- msgs = server.dropPeerAllRoutes(peer, drop)
+ server.dropPeerAllRoutes(peer, drop)
} else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE {
// RFC 4724 4.2
// If the session does not get re-established within the "Restart Time"
@@ -601,11 +555,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
// delete all the stale routes from the peer that it is retaining.
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
peer.DropAll(peer.configuredRFlist())
- msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist())
+ server.dropPeerAllRoutes(peer, peer.configuredRFlist())
}
- close(peer.outgoing)
- peer.outgoing = make(chan *FsmOutgoingMsg, 128)
+ peer.outgoing.Close()
+ peer.outgoing = channels.NewInfiniteChannel()
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
@@ -643,7 +597,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)}
+ sendFsmOutgoingMsg(peer, pathList, nil, false)
}
} else {
// RFC 4724 4.1
@@ -683,17 +637,20 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
server.broadcastPeerState(peer, oldState)
case FSM_MSG_ROUTE_REFRESH:
if paths := peer.handleRouteRefresh(e); len(paths) > 0 {
- return []*SenderMsg{newSenderMsg(peer, paths, nil, false)}
+ sendFsmOutgoingMsg(peer, paths, nil, false)
+ return
}
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
- return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)}
+ sendFsmOutgoingMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)
+ return
case *bgp.BGPMessage:
server.roaManager.validate(e.PathList)
pathList, eor, notification := peer.handleUpdate(e)
if notification != nil {
- return []*SenderMsg{newSenderMsg(peer, nil, notification, true)}
+ sendFsmOutgoingMsg(peer, nil, notification, true)
+ return
}
if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
@@ -716,7 +673,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
if len(pathList) > 0 {
var altered []*table.Path
- msgs, altered = server.propagateUpdate(peer, pathList)
+ altered = server.propagateUpdate(peer, pathList)
if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
@@ -777,7 +734,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
paths, _ := p.getBestFromLocal(p.configuredRFlist())
if len(paths) > 0 {
p.adjRibOut.Update(paths)
- msgs = append(msgs, newSenderMsg(p, paths, nil, false))
+ sendFsmOutgoingMsg(p, paths, nil, false)
}
}
log.WithFields(log.Fields{
@@ -797,8 +754,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
"Topic": "Peer",
"Key": peer.fsm.pConf.Config.NeighborAddress,
}).Debugf("withdraw %d stale routes", len(pathList))
- m, _ := server.propagateUpdate(peer, pathList)
- msgs = append(msgs, m...)
+ server.propagateUpdate(peer, pathList)
}
// we don't delay non-route-target NLRIs when peer is restarting
@@ -820,7 +776,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
if paths, _ := peer.getBestFromLocal(families); len(paths) > 0 {
peer.adjRibOut.Update(paths)
- msgs = append(msgs, newSenderMsg(peer, paths, nil, false))
+ sendFsmOutgoingMsg(peer, paths, nil, false)
}
}
}
@@ -832,7 +788,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}).Panic("unknown msg type")
}
}
- return msgs
+ return
}
func (server *BgpServer) SetGlobalType(g config.Global) error {
@@ -1493,7 +1449,7 @@ func (server *BgpServer) handleModConfig(grpcReq *GrpcRequest) error {
c = arg
case *api.StopServerRequest:
for k, _ := range server.neighborMap {
- _, err := server.handleDeleteNeighborRequest(&GrpcRequest{
+ err := server.handleDeleteNeighborRequest(&GrpcRequest{
Data: &api.DeleteNeighborRequest{
Peer: &api.Peer{
Conf: &api.PeerConf{
@@ -1554,9 +1510,7 @@ func sendMultipleResponses(grpcReq *GrpcRequest, results []*GrpcResponse) {
}
}
-func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
- var msgs []*SenderMsg
-
+func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
logOp := func(addr string, action string) {
log.WithFields(log.Fields{
"Topic": "Operation",
@@ -1581,7 +1535,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
ResponseErr: fmt.Errorf("bgpd main loop is not started yet"),
}
close(grpcReq.ResponseCh)
- return nil
+ return
}
var err error
@@ -1709,12 +1663,12 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
case REQ_ADD_PATH:
pathList := server.handleAddPathRequest(grpcReq)
if len(pathList) > 0 {
- msgs, _ = server.propagateUpdate(nil, pathList)
+ server.propagateUpdate(nil, pathList)
}
case REQ_DELETE_PATH:
pathList := server.handleDeletePathRequest(grpcReq)
if len(pathList) > 0 {
- msgs, _ = server.propagateUpdate(nil, pathList)
+ server.propagateUpdate(nil, pathList)
}
case REQ_BMP_NEIGHBORS:
//TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS
@@ -1838,7 +1792,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
logOp(grpcReq.Name, "Neighbor shutdown")
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
for _, peer := range peers {
- msgs = append(msgs, newSenderMsg(peer, nil, m, false))
+ sendFsmOutgoingMsg(peer, nil, m, false)
}
grpcReq.ResponseCh <- &GrpcResponse{Data: &api.ShutdownNeighborResponse{}}
close(grpcReq.ResponseCh)
@@ -1852,7 +1806,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
for _, peer := range peers {
peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset
- msgs = append(msgs, newSenderMsg(peer, nil, m, false))
+ sendFsmOutgoingMsg(peer, nil, m, false)
}
grpcReq.ResponseCh <- &GrpcResponse{Data: &api.ResetNeighborResponse{}}
close(grpcReq.ResponseCh)
@@ -1897,8 +1851,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
}
}
peer.adjRibIn.RefreshAcceptedNumber(families)
- m, _ := server.propagateUpdate(peer, pathList)
- msgs = append(msgs, m...)
+ server.propagateUpdate(peer, pathList)
}
if grpcReq.RequestType == REQ_NEIGHBOR_SOFT_RESET_IN {
@@ -1949,7 +1902,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
pathList, filtered := peer.getBestFromLocal(families)
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(peer, pathList, nil, false))
+ sendFsmOutgoingMsg(peer, pathList, nil, false)
}
if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 {
withdrawnList := make([]*table.Path, 0, len(filtered))
@@ -1965,7 +1918,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
withdrawnList = append(withdrawnList, p.Clone(true))
}
}
- msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false))
+ sendFsmOutgoingMsg(peer, withdrawnList, nil, false)
}
}
grpcReq.ResponseCh <- &GrpcResponse{Data: &api.SoftResetNeighborResponse{}}
@@ -2005,46 +1958,37 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
grpcReq.ResponseCh <- result
close(grpcReq.ResponseCh)
case REQ_GRPC_ADD_NEIGHBOR:
- _, err := server.handleAddNeighborRequest(grpcReq)
+ err := server.handleAddNeighborRequest(grpcReq)
grpcReq.ResponseCh <- &GrpcResponse{
Data: &api.AddNeighborResponse{},
ResponseErr: err,
}
close(grpcReq.ResponseCh)
case REQ_GRPC_DELETE_NEIGHBOR:
- m, err := server.handleDeleteNeighborRequest(grpcReq)
+ err := server.handleDeleteNeighborRequest(grpcReq)
grpcReq.ResponseCh <- &GrpcResponse{
Data: &api.DeleteNeighborResponse{},
ResponseErr: err,
}
- if len(m) > 0 {
- msgs = append(msgs, m...)
- }
close(grpcReq.ResponseCh)
case REQ_ADD_NEIGHBOR:
- _, err := server.handleAddNeighbor(grpcReq.Data.(*config.Neighbor))
+ err := server.handleAddNeighbor(grpcReq.Data.(*config.Neighbor))
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
}
close(grpcReq.ResponseCh)
case REQ_DEL_NEIGHBOR:
- m, err := server.handleDelNeighbor(grpcReq.Data.(*config.Neighbor), bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED)
+ err := server.handleDelNeighbor(grpcReq.Data.(*config.Neighbor), bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED)
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
}
- if len(m) > 0 {
- msgs = append(msgs, m...)
- }
close(grpcReq.ResponseCh)
case REQ_UPDATE_NEIGHBOR:
- m, policyUpdated, err := server.handleUpdateNeighbor(grpcReq.Data.(*config.Neighbor))
+ policyUpdated, err := server.handleUpdateNeighbor(grpcReq.Data.(*config.Neighbor))
grpcReq.ResponseCh <- &GrpcResponse{
Data: policyUpdated,
ResponseErr: err,
}
- if len(m) > 0 {
- msgs = append(msgs, m...)
- }
close(grpcReq.ResponseCh)
case REQ_GET_DEFINED_SET:
rsp, err := server.handleGrpcGetDefinedSet(grpcReq)
@@ -2175,7 +2119,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
case REQ_INJECT_MRT:
pathList := server.handleInjectMrtRequest(grpcReq)
if len(pathList) > 0 {
- msgs, _ = server.propagateUpdate(nil, pathList)
+ server.propagateUpdate(nil, pathList)
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
}
@@ -2197,7 +2141,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
case REQ_VRF, REQ_GET_VRF, REQ_ADD_VRF, REQ_DELETE_VRF:
pathList := server.handleVrfRequest(grpcReq)
if len(pathList) > 0 {
- msgs, _ = server.propagateUpdate(nil, pathList)
+ server.propagateUpdate(nil, pathList)
}
case REQ_RELOAD_POLICY:
err := server.handlePolicy(grpcReq.Data.(config.RoutingPolicy))
@@ -2242,13 +2186,13 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType)
goto ERROR
}
- return msgs
+ return
ERROR:
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
}
close(grpcReq.ResponseCh)
- return msgs
+ return
}
func (server *BgpServer) handleGrpcGetDefinedSet(grpcReq *GrpcRequest) (*api.GetDefinedSetResponse, error) {
@@ -2265,10 +2209,10 @@ func (server *BgpServer) handleGrpcGetDefinedSet(grpcReq *GrpcRequest) (*api.Get
return &api.GetDefinedSetResponse{Sets: sets}, nil
}
-func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) ([]*SenderMsg, error) {
+func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) error {
addr := c.Config.NeighborAddress
if _, y := server.neighborMap[addr]; y {
- return nil, fmt.Errorf("Can't overwrite the exising peer: %s", addr)
+ return fmt.Errorf("Can't overwrite the exising peer: %s", addr)
}
if server.bgpConfig.Global.Config.Port > 0 {
@@ -2297,14 +2241,14 @@ func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) ([]*SenderMsg, er
server.neighborMap[addr] = peer
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE)
- return nil, nil
+ return nil
}
-func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uint8) ([]*SenderMsg, error) {
+func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uint8) error {
addr := c.Config.NeighborAddress
n, y := server.neighborMap[addr]
if !y {
- return nil, fmt.Errorf("Can't delete a peer configuration for %s", addr)
+ return fmt.Errorf("Can't delete a peer configuration for %s", addr)
}
for _, l := range server.Listeners(addr) {
SetTcpMD5SigSockopts(l, addr, "")
@@ -2324,11 +2268,11 @@ func (server *BgpServer) handleDelNeighbor(c *config.Neighbor, code, subcode uin
t.Stop()
}(addr)
delete(server.neighborMap, addr)
- m := server.dropPeerAllRoutes(n, n.configuredRFlist())
- return m, nil
+ server.dropPeerAllRoutes(n, n.configuredRFlist())
+ return nil
}
-func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg, bool, error) {
+func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) (bool, error) {
addr := c.Config.NeighborAddress
peer := server.neighborMap[addr]
policyUpdated := false
@@ -2360,23 +2304,21 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg,
} else if original.Config.PeerAs != c.Config.PeerAs {
sub = bgp.BGP_ERROR_SUB_PEER_DECONFIGURED
}
- msgs, err := server.handleDelNeighbor(peer.fsm.pConf, bgp.BGP_ERROR_CEASE, sub)
- if err != nil {
+ if err := server.handleDelNeighbor(peer.fsm.pConf, bgp.BGP_ERROR_CEASE, sub); err != nil {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": addr,
}).Error(err)
- return msgs, policyUpdated, err
+ return policyUpdated, err
}
- msgs2, err := server.handleAddNeighbor(c)
- msgs = append(msgs, msgs2...)
+ err := server.handleAddNeighbor(c)
if err != nil {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": addr,
}).Error(err)
}
- return msgs, policyUpdated, err
+ return policyUpdated, err
}
if !original.Timers.Config.Equal(&c.Timers.Config) {
@@ -2387,7 +2329,7 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg,
peer.fsm.pConf.Timers.Config = c.Timers.Config
}
- msgs, err := peer.updatePrefixLimitConfig(c.AfiSafis)
+ err := peer.updatePrefixLimitConfig(c.AfiSafis)
if err != nil {
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -2395,15 +2337,14 @@ func (server *BgpServer) handleUpdateNeighbor(c *config.Neighbor) ([]*SenderMsg,
}).Error(err)
// rollback to original state
peer.fsm.pConf = original
- return nil, policyUpdated, err
}
- return msgs, policyUpdated, nil
+ return policyUpdated, err
}
-func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) ([]*SenderMsg, error) {
+func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) error {
arg, ok := grpcReq.Data.(*api.AddNeighborRequest)
if !ok {
- return []*SenderMsg{}, fmt.Errorf("AddNeighborRequest type assertion failed")
+ return fmt.Errorf("AddNeighborRequest type assertion failed")
} else {
apitoConfig := func(a *api.Peer) (*config.Neighbor, error) {
pconf := &config.Neighbor{}
@@ -2517,13 +2458,13 @@ func (server *BgpServer) handleAddNeighborRequest(grpcReq *GrpcRequest) ([]*Send
}
c, err := apitoConfig(arg.Peer)
if err != nil {
- return nil, err
+ return err
}
return server.handleAddNeighbor(c)
}
}
-func (server *BgpServer) handleDeleteNeighborRequest(grpcReq *GrpcRequest) ([]*SenderMsg, error) {
+func (server *BgpServer) handleDeleteNeighborRequest(grpcReq *GrpcRequest) error {
arg := grpcReq.Data.(*api.DeleteNeighborRequest)
return server.handleDelNeighbor(&config.Neighbor{
Config: config.NeighborConfig{