summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go94
1 files changed, 44 insertions, 50 deletions
diff --git a/server/server.go b/server/server.go
index 7f84d5db..af88f3ca 100644
--- a/server/server.go
+++ b/server/server.go
@@ -38,9 +38,8 @@ import (
var policyMutex sync.RWMutex
type SenderMsg struct {
- messages []*bgp.BGPMessage
- sendCh chan *bgp.BGPMessage
- destination string
+ ch chan *FsmOutgoingMsg
+ msg *FsmOutgoingMsg
}
type broadcastMsg interface {
@@ -206,19 +205,17 @@ func (server *BgpServer) Serve() {
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
- for {
- // TODO: must be more clever. Slow peer makes other peers slow too.
- m := <-ch
- w := func(c chan *bgp.BGPMessage, msg *bgp.BGPMessage) {
- // nasty but the peer could already become non established state before here.
- defer func() { recover() }()
- c <- msg
- }
+ w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) {
+ // nasty but the peer could already become non established state before here.
+ defer func() { recover() }()
+ c <- msg
+ }
- for _, b := range m.messages {
- w(m.sendCh, b)
- }
+ for m := range ch {
+ // TODO: must be more clever. Slow peer makes other peers slow too.
+ w(m.ch, m.msg)
}
+
}(senderCh)
broadcastCh := make(chan broadcastMsg, 8)
@@ -356,11 +353,14 @@ func (server *BgpServer) Serve() {
}
}
-func newSenderMsg(peer *Peer, messages []*bgp.BGPMessage) *SenderMsg {
+func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg {
return &SenderMsg{
- messages: messages,
- sendCh: peer.outgoing,
- destination: peer.conf.Config.NeighborAddress,
+ ch: peer.outgoing,
+ msg: &FsmOutgoingMsg{
+ Paths: paths,
+ Notification: notification,
+ StayIdle: stayIdle,
+ },
}
}
@@ -451,9 +451,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path {
}
func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg {
- msgs := make([]*SenderMsg, 0)
-
ids := make([]string, 0, len(server.neighborMap))
+ msgs := make([]*SenderMsg, 0, len(server.neighborMap))
if peer.isRouteServerClient() {
for _, targetPeer := range server.neighborMap {
if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
@@ -477,8 +476,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 {
- msgList := table.CreateUpdateMsgFromPaths(paths)
- msgs = append(msgs, newSenderMsg(targetPeer, msgList))
+ msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
}
}
}
@@ -741,22 +739,20 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
}
}
- msgs := make([]*SenderMsg, 0)
+ msgs := make([]*SenderMsg, 0, len(server.neighborMap))
for _, targetPeer := range server.neighborMap {
if (peer == nil && targetPeer.isRouteServerClient()) || (peer != nil && peer.isRouteServerClient() != targetPeer.isRouteServerClient()) {
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 {
- msgList := table.CreateUpdateMsgFromPaths(paths)
- msgs = append(msgs, newSenderMsg(targetPeer, msgList))
+ msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
}
}
return msgs, alteredPathList
}
func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
- msgs := make([]*SenderMsg, 0)
-
+ var msgs []*SenderMsg
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
@@ -779,7 +775,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
drop = peer.configuredRFlist()
}
peer.DropAll(drop)
- msgs = append(msgs, server.dropPeerAllRoutes(peer, drop)...)
+ msgs = server.dropPeerAllRoutes(peer, drop)
} else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE {
// RFC 4724 4.2
// If the session does not get re-established within the "Restart Time"
@@ -787,12 +783,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
// delete all the stale routes from the peer that it is retaining.
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
peer.DropAll(peer.configuredRFlist())
-
- msgs = append(msgs, server.dropPeerAllRoutes(peer, peer.configuredRFlist())...)
+ msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist())
}
close(peer.outgoing)
- peer.outgoing = make(chan *bgp.BGPMessage, 128)
+ peer.outgoing = make(chan *FsmOutgoingMsg, 128)
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
@@ -801,7 +796,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
pathList, _ := peer.getBestFromLocal(peer.configuredRFlist())
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList)))
+ msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)}
}
} else {
// RFC 4724 4.1
@@ -843,13 +838,16 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, oldState)
-
+ case FSM_MSG_ROUTE_REFRESH:
+ if paths := peer.handleRouteRefresh(e); len(paths) > 0 {
+ return []*SenderMsg{newSenderMsg(peer, paths, nil, false)}
+ }
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)}))
+ return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)}
case *bgp.BGPMessage:
- pathList, msgList, eor := peer.handleBGPmessage(e)
+ pathList, eor := peer.handleUpdate(e)
if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
@@ -870,13 +868,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
}
- if len(msgList) > 0 {
- msgs = append(msgs, newSenderMsg(peer, msgList))
- }
-
if len(pathList) > 0 {
- m, altered := server.propagateUpdate(peer, pathList)
- msgs = append(msgs, m...)
+ var altered []*table.Path
+ msgs, altered = server.propagateUpdate(peer, pathList)
if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
@@ -930,10 +924,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
if !p.isGracefulRestartEnabled() {
continue
}
- pathList, _ := p.getBestFromLocal(p.configuredRFlist())
- if len(pathList) > 0 {
- p.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(p, table.CreateUpdateMsgFromPaths(pathList)))
+ paths, _ := p.getBestFromLocal(p.configuredRFlist())
+ if len(paths) > 0 {
+ p.adjRibOut.Update(paths)
+ msgs = append(msgs, newSenderMsg(p, paths, nil, false))
}
}
log.WithFields(log.Fields{
@@ -1970,7 +1964,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
logOp(grpcReq.Name, "Neighbor shutdown")
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
for _, peer := range peers {
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m}))
+ msgs = append(msgs, newSenderMsg(peer, nil, m, false))
}
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
@@ -1981,10 +1975,10 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
break
}
logOp(grpcReq.Name, "Neighbor reset")
+ m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
for _, peer := range peers {
peer.fsm.idleHoldTime = peer.conf.Timers.Config.IdleHoldTimeAfterReset
- m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m}))
+ msgs = append(msgs, newSenderMsg(peer, nil, m, false))
}
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
@@ -2063,7 +2057,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
pathList, filtered := peer.getBestFromLocal(families)
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList)))
+ msgs = append(msgs, newSenderMsg(peer, pathList, nil, false))
}
if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 {
withdrawnList := make([]*table.Path, 0, len(filtered))
@@ -2079,7 +2073,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
withdrawnList = append(withdrawnList, p.Clone(true))
}
}
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(withdrawnList)))
+ msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false))
}
}
grpcReq.ResponseCh <- &GrpcResponse{}