diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-09 14:14:16 -0800 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-09 14:14:16 -0800 |
commit | 84d73c1b45006c40764c1587207b5ad67598e893 (patch) | |
tree | de2bb447da0d11aef0c589d8fe5fc3e06be71f44 /server | |
parent | 431ce6b171a05cd17784a86ea2da1dfbf2f37ea0 (diff) |
server: move POLICY_DIRECTION_IN processing from peer to fsm
For parallel processing. Each peer's rx goroutine can process IN
policy.
Note that RWLock() should be called in looking at policies via grpc
too but such operaitons are done in the main goroutine so not called.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/fsm.go | 13 | ||||
-rw-r--r-- | server/fsm_test.go | 2 | ||||
-rw-r--r-- | server/peer.go | 3 | ||||
-rw-r--r-- | server/server.go | 7 |
4 files changed, 18 insertions, 7 deletions
diff --git a/server/fsm.go b/server/fsm.go index 66762ca3..c41aacd5 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -37,9 +37,9 @@ const ( ) type fsmMsg struct { - MsgType fsmMsgType - MsgSrc string - MsgData interface{} + MsgType fsmMsgType + MsgSrc string + MsgData interface{} PathList []*table.Path } @@ -83,6 +83,7 @@ type FSM struct { rfMap map[bgp.RouteFamily]bool confedCheck bool peerInfo *table.PeerInfo + peer *Peer } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -134,7 +135,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConf *config.Global, pConf *config.Neighbor) *FSM { +func NewFSM(gConf *config.Global, pConf *config.Neighbor, peer *Peer) *FSM { adminState := ADMIN_STATE_UP if pConf.NeighborState.AdminDown == true { adminState = ADMIN_STATE_DOWN @@ -151,6 +152,7 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor) *FSM { rfMap: make(map[bgp.RouteFamily]bool), confedCheck: !config.IsConfederationMember(gConf, pConf) && config.IsEBGPPeer(gConf, pConf), peerInfo: table.NewPeerInfo(gConf, pConf), + peer: peer, } fsm.t.Go(fsm.connectLoop) return fsm @@ -508,6 +510,9 @@ func (h *FSMHandler) recvMessageWithError() error { // FIXME: we should use the original message for bmp/mrt table.UpdatePathAttrs4ByteAs(body) fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo) + policyMutex.RLock() + h.fsm.peer.ApplyPolicy(table.POLICY_DIRECTION_IN, fmsg.PathList) + policyMutex.RUnlock() } fallthrough case bgp.BGP_MSG_KEEPALIVE: diff --git a/server/fsm_test.go b/server/fsm_test.go index 72dfe05f..0ce1a53f 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -293,7 +293,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface), } - p.fsm = NewFSM(&gConf, &pConf) + p.fsm = NewFSM(&gConf, &pConf, &Peer{}) incoming := make(chan *fsmMsg, 4096) p.outgoing = make(chan *bgp.BGPMessage, 4096) diff --git a/server/peer.go b/server/peer.go index 3009c73e..425d61ee 100644 --- a/server/peer.go +++ b/server/peer.go @@ -59,7 +59,7 @@ func NewPeer(g config.Global, conf config.Neighbor, loc *table.TableManager) *Pe conf.NeighborState.SessionState = uint32(bgp.BGP_FSM_IDLE) conf.Timers.TimersState.Downtime = time.Now().Unix() peer.adjRib = table.NewAdjRib(peer.configuredRFlist()) - peer.fsm = NewFSM(&g, &conf) + peer.fsm = NewFSM(&g, &conf, peer) return peer } @@ -197,7 +197,6 @@ func (peer *Peer) handleBGPmessage(e *fsmMsg) ([]*table.Path, bool, []*bgp.BGPMe if len(e.PathList) > 0 { pathList = e.PathList peer.staleAccepted = true - peer.ApplyPolicy(table.POLICY_DIRECTION_IN, pathList) peer.adjRib.UpdateIn(pathList) } case bgp.BGP_MSG_NOTIFICATION: diff --git a/server/server.go b/server/server.go index 9c160e8b..d6aad66c 100644 --- a/server/server.go +++ b/server/server.go @@ -30,6 +30,7 @@ import ( "os" "strconv" "strings" + "sync" "time" ) @@ -37,6 +38,8 @@ const ( GLOBAL_RIB_NAME = "global" ) +var policyMutex sync.RWMutex + type SenderMsg struct { messages []*bgp.BGPMessage sendCh chan *bgp.BGPMessage @@ -2000,6 +2003,8 @@ func (server *BgpServer) policyInUse(x *table.Policy) bool { } func (server *BgpServer) handleGrpcModPolicy(grpcReq *GrpcRequest) error { + policyMutex.Lock() + defer policyMutex.Unlock() arg := grpcReq.Data.(*api.ModPolicyArguments) x, err := table.NewPolicyFromApiStruct(arg.Policy, server.policy.DefinedSetMap) if err != nil { @@ -2121,6 +2126,8 @@ func (server *BgpServer) handleGrpcModPolicyAssignment(grpcReq *GrpcRequest) err var err error var dir table.PolicyDirection var i policyPoint + policyMutex.Lock() + defer policyMutex.Unlock() arg := grpcReq.Data.(*api.ModPolicyAssignmentArguments) assignment := arg.Assignment i, dir, err = server.getPolicyInfo(assignment) |