summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-04-06 22:44:54 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-04-10 00:04:48 +0900
commit0d2633e7831cb2406bacf62a232fcca7184ff988 (patch)
tree3b3e32f5106e94b100af2f802142b5e1ad69d922 /server
parent3ae8c9170b399ed9b64fe5382fc9a58c5c9f8b6b (diff)
server: serialize in fsm send loop
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/fsm.go38
-rw-r--r--server/fsm_test.go2
-rw-r--r--server/peer.go106
-rw-r--r--server/server.go94
4 files changed, 126 insertions, 114 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 68e27074..48f4e095 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -81,6 +81,7 @@ const (
_ FsmMsgType = iota
FSM_MSG_STATE_CHANGE
FSM_MSG_BGP_MESSAGE
+ FSM_MSG_ROUTE_REFRESH
)
type FsmMsg struct {
@@ -92,6 +93,12 @@ type FsmMsg struct {
payload []byte
}
+type FsmOutgoingMsg struct {
+ Paths []*table.Path
+ Notification *bgp.BGPMessage
+ StayIdle bool
+}
+
const (
HOLDTIME_OPENSENT = 240
HOLDTIME_IDLE = 5
@@ -359,11 +366,11 @@ type FSMHandler struct {
errorCh chan FsmStateReason
incoming *channels.InfiniteChannel
stateCh chan *FsmMsg
- outgoing chan *bgp.BGPMessage
+ outgoing chan *FsmOutgoingMsg
holdTimerResetCh chan bool
}
-func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *FsmOutgoingMsg) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan FsmStateReason, 2),
@@ -612,6 +619,8 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
fmsg.MsgData = m
if h.fsm.state == bgp.BGP_FSM_ESTABLISHED {
switch m.Header.Type {
+ case bgp.BGP_MSG_ROUTE_REFRESH:
+ fmsg.MsgType = FSM_MSG_ROUTE_REFRESH
case bgp.BGP_MSG_UPDATE:
body := m.Body.(*bgp.BGPUpdate)
confedCheck := !config.IsConfederationMember(h.fsm.gConf, h.fsm.pConf) && config.IsEBGPPeer(h.fsm.gConf, h.fsm.pConf)
@@ -1071,14 +1080,26 @@ func (h *FSMHandler) sendMessageloop() error {
}
return nil
case m := <-h.outgoing:
- if err := send(m); err != nil {
- return nil
+ for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths) {
+ if err := send(msg); err != nil {
+ return nil
+ }
+ }
+ if m.Notification != nil {
+ if err := send(m.Notification); err != nil {
+ return nil
+ }
+ if m.StayIdle {
+ select {
+ case h.fsm.adminStateCh <- ADMIN_STATE_DOWN:
+ default:
+ }
+ }
}
case <-ticker.C:
if err := send(bgp.NewBGPKeepAliveMessage()); err != nil {
return nil
}
-
}
}
}
@@ -1146,7 +1167,7 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) {
"data": bgp.BGP_FSM_ESTABLISHED,
}).Warn("hold timer expired")
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil)
- h.outgoing <- m
+ h.outgoing <- &FsmOutgoingMsg{Notification: m}
return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED
case <-h.holdTimerResetCh:
if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 {
@@ -1157,9 +1178,8 @@ func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) {
if err == nil {
switch s {
case ADMIN_STATE_DOWN:
- m := bgp.NewBGPNotificationMessage(
- bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
- h.outgoing <- m
+ m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
+ h.outgoing <- &FsmOutgoingMsg{Notification: m}
}
}
}
diff --git a/server/fsm_test.go b/server/fsm_test.go
index 8d98d751..afe66a0f 100644
--- a/server/fsm_test.go
+++ b/server/fsm_test.go
@@ -294,7 +294,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy())
- p.outgoing = make(chan *bgp.BGPMessage, 4096)
+ p.outgoing = make(chan *FsmOutgoingMsg, 4096)
h := &FSMHandler{
fsm: p.fsm,
diff --git a/server/peer.go b/server/peer.go
index f5491b33..950526dd 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -39,7 +39,7 @@ type Peer struct {
fsm *FSM
adjRibIn *table.AdjRib
adjRibOut *table.AdjRib
- outgoing chan *bgp.BGPMessage
+ outgoing chan *FsmOutgoingMsg
policy *table.RoutingPolicy
localRib *table.TableManager
}
@@ -48,7 +48,7 @@ func NewPeer(g config.Global, conf config.Neighbor, loc *table.TableManager, pol
peer := &Peer{
gConf: g,
conf: conf,
- outgoing: make(chan *bgp.BGPMessage, 128),
+ outgoing: make(chan *FsmOutgoingMsg, 128),
localRib: loc,
policy: policy,
}
@@ -197,68 +197,66 @@ func (peer *Peer) processOutgoingPaths(paths []*table.Path) []*table.Path {
return outgoing
}
-func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage, []bgp.RouteFamily) {
+func (peer *Peer) handleRouteRefresh(e *FsmMsg) []*table.Path {
+ m := e.MsgData.(*bgp.BGPMessage)
+ rr := m.Body.(*bgp.BGPRouteRefresh)
+ rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI)
+ if _, ok := peer.fsm.rfMap[rf]; !ok {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.conf.Config.NeighborAddress,
+ "Data": rf,
+ }).Warn("Route family isn't supported")
+ return nil
+ }
+ if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; !ok {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.conf.Config.NeighborAddress,
+ }).Warn("ROUTE_REFRESH received but the capability wasn't advertised")
+ return nil
+ }
+ rfList := []bgp.RouteFamily{rf}
+ peer.adjRibOut.Drop(rfList)
+ accepted, filtered := peer.getBestFromLocal(rfList)
+ peer.adjRibOut.Update(accepted)
+ for _, path := range filtered {
+ path.IsWithdraw = true
+ accepted = append(accepted, path)
+ }
+ return accepted
+}
+
+func (peer *Peer) handleUpdate(e *FsmMsg) ([]*table.Path, []bgp.RouteFamily) {
m := e.MsgData.(*bgp.BGPMessage)
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.conf.Config.NeighborAddress,
"data": m,
}).Debug("received")
- eor := []bgp.RouteFamily{}
-
- switch m.Header.Type {
- case bgp.BGP_MSG_ROUTE_REFRESH:
- rr := m.Body.(*bgp.BGPRouteRefresh)
- rf := bgp.AfiSafiToRouteFamily(rr.AFI, rr.SAFI)
- if _, ok := peer.fsm.rfMap[rf]; !ok {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": peer.conf.Config.NeighborAddress,
- "Data": rf,
- }).Warn("Route family isn't supported")
- break
- }
- if _, ok := peer.fsm.capMap[bgp.BGP_CAP_ROUTE_REFRESH]; ok {
- rfList := []bgp.RouteFamily{rf}
- peer.adjRibOut.Drop(rfList)
- accepted, filtered := peer.getBestFromLocal(rfList)
- peer.adjRibOut.Update(accepted)
- for _, path := range filtered {
- path.IsWithdraw = true
- accepted = append(accepted, path)
+ peer.conf.Timers.State.UpdateRecvTime = time.Now().Unix()
+ if len(e.PathList) > 0 {
+ peer.adjRibIn.Update(e.PathList)
+ paths := make([]*table.Path, 0, len(e.PathList))
+ eor := []bgp.RouteFamily{}
+ for _, path := range e.PathList {
+ if path.IsEOR() {
+ family := path.GetRouteFamily()
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.conf.Config.NeighborAddress,
+ "AddressFamily": family,
+ }).Debug("EOR received")
+ eor = append(eor, family)
+ continue
}
- return nil, table.CreateUpdateMsgFromPaths(accepted), eor
- } else {
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": peer.conf.Config.NeighborAddress,
- }).Warn("ROUTE_REFRESH received but the capability wasn't advertised")
- }
-
- case bgp.BGP_MSG_UPDATE:
- peer.conf.Timers.State.UpdateRecvTime = time.Now().Unix()
- if len(e.PathList) > 0 {
- peer.adjRibIn.Update(e.PathList)
- paths := make([]*table.Path, 0, len(e.PathList))
- for _, path := range e.PathList {
- if path.IsEOR() {
- family := path.GetRouteFamily()
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": peer.conf.Config.NeighborAddress,
- "AddressFamily": family,
- }).Debug("EOR received")
- eor = append(eor, family)
- continue
- }
- if path.Filtered(peer.ID()) != table.POLICY_DIRECTION_IN {
- paths = append(paths, path)
- }
+ if path.Filtered(peer.ID()) != table.POLICY_DIRECTION_IN {
+ paths = append(paths, path)
}
- return paths, nil, eor
}
+ return paths, eor
}
- return nil, nil, eor
+ return nil, nil
}
func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) {
diff --git a/server/server.go b/server/server.go
index 7f84d5db..af88f3ca 100644
--- a/server/server.go
+++ b/server/server.go
@@ -38,9 +38,8 @@ import (
var policyMutex sync.RWMutex
type SenderMsg struct {
- messages []*bgp.BGPMessage
- sendCh chan *bgp.BGPMessage
- destination string
+ ch chan *FsmOutgoingMsg
+ msg *FsmOutgoingMsg
}
type broadcastMsg interface {
@@ -206,19 +205,17 @@ func (server *BgpServer) Serve() {
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
- for {
- // TODO: must be more clever. Slow peer makes other peers slow too.
- m := <-ch
- w := func(c chan *bgp.BGPMessage, msg *bgp.BGPMessage) {
- // nasty but the peer could already become non established state before here.
- defer func() { recover() }()
- c <- msg
- }
+ w := func(c chan *FsmOutgoingMsg, msg *FsmOutgoingMsg) {
+ // nasty but the peer could already become non established state before here.
+ defer func() { recover() }()
+ c <- msg
+ }
- for _, b := range m.messages {
- w(m.sendCh, b)
- }
+ for m := range ch {
+ // TODO: must be more clever. Slow peer makes other peers slow too.
+ w(m.ch, m.msg)
}
+
}(senderCh)
broadcastCh := make(chan broadcastMsg, 8)
@@ -356,11 +353,14 @@ func (server *BgpServer) Serve() {
}
}
-func newSenderMsg(peer *Peer, messages []*bgp.BGPMessage) *SenderMsg {
+func newSenderMsg(peer *Peer, paths []*table.Path, notification *bgp.BGPMessage, stayIdle bool) *SenderMsg {
return &SenderMsg{
- messages: messages,
- sendCh: peer.outgoing,
- destination: peer.conf.Config.NeighborAddress,
+ ch: peer.outgoing,
+ msg: &FsmOutgoingMsg{
+ Paths: paths,
+ Notification: notification,
+ StayIdle: stayIdle,
+ },
}
}
@@ -451,9 +451,8 @@ func filterpath(peer *Peer, path *table.Path) *table.Path {
}
func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamily) []*SenderMsg {
- msgs := make([]*SenderMsg, 0)
-
ids := make([]string, 0, len(server.neighborMap))
+ msgs := make([]*SenderMsg, 0, len(server.neighborMap))
if peer.isRouteServerClient() {
for _, targetPeer := range server.neighborMap {
if !targetPeer.isRouteServerClient() || targetPeer == peer || targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
@@ -477,8 +476,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 {
- msgList := table.CreateUpdateMsgFromPaths(paths)
- msgs = append(msgs, newSenderMsg(targetPeer, msgList))
+ msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
}
}
}
@@ -741,22 +739,20 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
}
}
- msgs := make([]*SenderMsg, 0)
+ msgs := make([]*SenderMsg, 0, len(server.neighborMap))
for _, targetPeer := range server.neighborMap {
if (peer == nil && targetPeer.isRouteServerClient()) || (peer != nil && peer.isRouteServerClient() != targetPeer.isRouteServerClient()) {
continue
}
if paths := targetPeer.processOutgoingPaths(best[targetPeer.TableID()]); len(paths) > 0 {
- msgList := table.CreateUpdateMsgFromPaths(paths)
- msgs = append(msgs, newSenderMsg(targetPeer, msgList))
+ msgs = append(msgs, newSenderMsg(targetPeer, paths, nil, false))
}
}
return msgs, alteredPathList
}
func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
- msgs := make([]*SenderMsg, 0)
-
+ var msgs []*SenderMsg
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
@@ -779,7 +775,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
drop = peer.configuredRFlist()
}
peer.DropAll(drop)
- msgs = append(msgs, server.dropPeerAllRoutes(peer, drop)...)
+ msgs = server.dropPeerAllRoutes(peer, drop)
} else if peer.fsm.pConf.GracefulRestart.State.PeerRestarting && nextState == bgp.BGP_FSM_IDLE {
// RFC 4724 4.2
// If the session does not get re-established within the "Restart Time"
@@ -787,12 +783,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
// delete all the stale routes from the peer that it is retaining.
peer.fsm.pConf.GracefulRestart.State.PeerRestarting = false
peer.DropAll(peer.configuredRFlist())
-
- msgs = append(msgs, server.dropPeerAllRoutes(peer, peer.configuredRFlist())...)
+ msgs = server.dropPeerAllRoutes(peer, peer.configuredRFlist())
}
close(peer.outgoing)
- peer.outgoing = make(chan *bgp.BGPMessage, 128)
+ peer.outgoing = make(chan *FsmOutgoingMsg, 128)
if nextState == bgp.BGP_FSM_ESTABLISHED {
// update for export policy
laddr, _ := peer.fsm.LocalHostPort()
@@ -801,7 +796,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
pathList, _ := peer.getBestFromLocal(peer.configuredRFlist())
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList)))
+ msgs = []*SenderMsg{newSenderMsg(peer, pathList, nil, false)}
}
} else {
// RFC 4724 4.1
@@ -843,13 +838,16 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
}
peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh)
server.broadcastPeerState(peer, oldState)
-
+ case FSM_MSG_ROUTE_REFRESH:
+ if paths := peer.handleRouteRefresh(e); len(paths) > 0 {
+ return []*SenderMsg{newSenderMsg(peer, paths, nil, false)}
+ }
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)}))
+ return []*SenderMsg{newSenderMsg(peer, nil, bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data), false)}
case *bgp.BGPMessage:
- pathList, msgList, eor := peer.handleBGPmessage(e)
+ pathList, eor := peer.handleUpdate(e)
if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
@@ -870,13 +868,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
}
- if len(msgList) > 0 {
- msgs = append(msgs, newSenderMsg(peer, msgList))
- }
-
if len(pathList) > 0 {
- m, altered := server.propagateUpdate(peer, pathList)
- msgs = append(msgs, m...)
+ var altered []*table.Path
+ msgs, altered = server.propagateUpdate(peer, pathList)
if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
@@ -930,10 +924,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
if !p.isGracefulRestartEnabled() {
continue
}
- pathList, _ := p.getBestFromLocal(p.configuredRFlist())
- if len(pathList) > 0 {
- p.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(p, table.CreateUpdateMsgFromPaths(pathList)))
+ paths, _ := p.getBestFromLocal(p.configuredRFlist())
+ if len(paths) > 0 {
+ p.adjRibOut.Update(paths)
+ msgs = append(msgs, newSenderMsg(p, paths, nil, false))
}
}
log.WithFields(log.Fields{
@@ -1970,7 +1964,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
logOp(grpcReq.Name, "Neighbor shutdown")
m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
for _, peer := range peers {
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m}))
+ msgs = append(msgs, newSenderMsg(peer, nil, m, false))
}
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
@@ -1981,10 +1975,10 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
break
}
logOp(grpcReq.Name, "Neighbor reset")
+ m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
for _, peer := range peers {
peer.fsm.idleHoldTime = peer.conf.Timers.Config.IdleHoldTimeAfterReset
- m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
- msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{m}))
+ msgs = append(msgs, newSenderMsg(peer, nil, m, false))
}
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
@@ -2063,7 +2057,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
pathList, filtered := peer.getBestFromLocal(families)
if len(pathList) > 0 {
peer.adjRibOut.Update(pathList)
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(pathList)))
+ msgs = append(msgs, newSenderMsg(peer, pathList, nil, false))
}
if grpcReq.RequestType != REQ_DEFERRAL_TIMER_EXPIRED && len(filtered) > 0 {
withdrawnList := make([]*table.Path, 0, len(filtered))
@@ -2079,7 +2073,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
withdrawnList = append(withdrawnList, p.Clone(true))
}
}
- msgs = append(msgs, newSenderMsg(peer, table.CreateUpdateMsgFromPaths(withdrawnList)))
+ msgs = append(msgs, newSenderMsg(peer, withdrawnList, nil, false))
}
}
grpcReq.ResponseCh <- &GrpcResponse{}