summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-09 14:14:16 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-09 14:14:16 -0800
commit84d73c1b45006c40764c1587207b5ad67598e893 (patch)
treede2bb447da0d11aef0c589d8fe5fc3e06be71f44 /server
parent431ce6b171a05cd17784a86ea2da1dfbf2f37ea0 (diff)
server: move POLICY_DIRECTION_IN processing from peer to fsm
For parallel processing. Each peer's rx goroutine can process IN policy. Note that RWLock() should be called in looking at policies via grpc too but such operaitons are done in the main goroutine so not called. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/fsm.go13
-rw-r--r--server/fsm_test.go2
-rw-r--r--server/peer.go3
-rw-r--r--server/server.go7
4 files changed, 18 insertions, 7 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 66762ca3..c41aacd5 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -37,9 +37,9 @@ const (
)
type fsmMsg struct {
- MsgType fsmMsgType
- MsgSrc string
- MsgData interface{}
+ MsgType fsmMsgType
+ MsgSrc string
+ MsgData interface{}
PathList []*table.Path
}
@@ -83,6 +83,7 @@ type FSM struct {
rfMap map[bgp.RouteFamily]bool
confedCheck bool
peerInfo *table.PeerInfo
+ peer *Peer
}
func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
@@ -134,7 +135,7 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
}
-func NewFSM(gConf *config.Global, pConf *config.Neighbor) *FSM {
+func NewFSM(gConf *config.Global, pConf *config.Neighbor, peer *Peer) *FSM {
adminState := ADMIN_STATE_UP
if pConf.NeighborState.AdminDown == true {
adminState = ADMIN_STATE_DOWN
@@ -151,6 +152,7 @@ func NewFSM(gConf *config.Global, pConf *config.Neighbor) *FSM {
rfMap: make(map[bgp.RouteFamily]bool),
confedCheck: !config.IsConfederationMember(gConf, pConf) && config.IsEBGPPeer(gConf, pConf),
peerInfo: table.NewPeerInfo(gConf, pConf),
+ peer: peer,
}
fsm.t.Go(fsm.connectLoop)
return fsm
@@ -508,6 +510,9 @@ func (h *FSMHandler) recvMessageWithError() error {
// FIXME: we should use the original message for bmp/mrt
table.UpdatePathAttrs4ByteAs(body)
fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo)
+ policyMutex.RLock()
+ h.fsm.peer.ApplyPolicy(table.POLICY_DIRECTION_IN, fmsg.PathList)
+ policyMutex.RUnlock()
}
fallthrough
case bgp.BGP_MSG_KEEPALIVE:
diff --git a/server/fsm_test.go b/server/fsm_test.go
index 72dfe05f..0ce1a53f 100644
--- a/server/fsm_test.go
+++ b/server/fsm_test.go
@@ -293,7 +293,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface),
}
- p.fsm = NewFSM(&gConf, &pConf)
+ p.fsm = NewFSM(&gConf, &pConf, &Peer{})
incoming := make(chan *fsmMsg, 4096)
p.outgoing = make(chan *bgp.BGPMessage, 4096)
diff --git a/server/peer.go b/server/peer.go
index 3009c73e..425d61ee 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -59,7 +59,7 @@ func NewPeer(g config.Global, conf config.Neighbor, loc *table.TableManager) *Pe
conf.NeighborState.SessionState = uint32(bgp.BGP_FSM_IDLE)
conf.Timers.TimersState.Downtime = time.Now().Unix()
peer.adjRib = table.NewAdjRib(peer.configuredRFlist())
- peer.fsm = NewFSM(&g, &conf)
+ peer.fsm = NewFSM(&g, &conf, peer)
return peer
}
@@ -197,7 +197,6 @@ func (peer *Peer) handleBGPmessage(e *fsmMsg) ([]*table.Path, bool, []*bgp.BGPMe
if len(e.PathList) > 0 {
pathList = e.PathList
peer.staleAccepted = true
- peer.ApplyPolicy(table.POLICY_DIRECTION_IN, pathList)
peer.adjRib.UpdateIn(pathList)
}
case bgp.BGP_MSG_NOTIFICATION:
diff --git a/server/server.go b/server/server.go
index 9c160e8b..d6aad66c 100644
--- a/server/server.go
+++ b/server/server.go
@@ -30,6 +30,7 @@ import (
"os"
"strconv"
"strings"
+ "sync"
"time"
)
@@ -37,6 +38,8 @@ const (
GLOBAL_RIB_NAME = "global"
)
+var policyMutex sync.RWMutex
+
type SenderMsg struct {
messages []*bgp.BGPMessage
sendCh chan *bgp.BGPMessage
@@ -2000,6 +2003,8 @@ func (server *BgpServer) policyInUse(x *table.Policy) bool {
}
func (server *BgpServer) handleGrpcModPolicy(grpcReq *GrpcRequest) error {
+ policyMutex.Lock()
+ defer policyMutex.Unlock()
arg := grpcReq.Data.(*api.ModPolicyArguments)
x, err := table.NewPolicyFromApiStruct(arg.Policy, server.policy.DefinedSetMap)
if err != nil {
@@ -2121,6 +2126,8 @@ func (server *BgpServer) handleGrpcModPolicyAssignment(grpcReq *GrpcRequest) err
var err error
var dir table.PolicyDirection
var i policyPoint
+ policyMutex.Lock()
+ defer policyMutex.Unlock()
arg := grpcReq.Data.(*api.ModPolicyAssignmentArguments)
assignment := arg.Assignment
i, dir, err = server.getPolicyInfo(assignment)