summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-01-18 10:25:36 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-01-18 10:25:36 +0900
commita9e55d3bd75fd5dd633fee1ff2c9d45d6d6447ee (patch)
tree792429269f168a34df4ca78baef5909b072c200c /server
parentac54c58e9d5466804f67c9aca32a23beaeb6882d (diff)
server: move outgoing and incoming channels to FSMHandler
This patch solves the problem that old messages in outgoing will be sent in a new established session. Peer goroutine puts some messagages in the outgoing channel. Then the state changes from established. In such case, once the state becomes established, the "stale" messages in the outgoing channel will be sent. With this patch, new outgoing and incoming channels are created at every state change. No more "stale" messages in the channles. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/fsm.go34
-rw-r--r--server/peer.go29
2 files changed, 31 insertions, 32 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 606ebbf9..af6718ca 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -42,8 +42,6 @@ type FSM struct {
peerConfig *config.NeighborType
keepaliveTicker *time.Ticker
state bgp.FSMState
- incoming chan *fsmMsg
- outgoing chan *bgp.BGPMessage
passiveConn *net.TCPConn
passiveConnCh chan *net.TCPConn
}
@@ -90,12 +88,10 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) {
}
}
-func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSM {
+func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn) *FSM {
return &FSM{
globalConfig: gConfig,
peerConfig: pConfig,
- incoming: incoming,
- outgoing: outgoing,
state: bgp.BGP_FSM_IDLE,
passiveConnCh: connCh,
}
@@ -112,17 +108,21 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) {
}
type FSMHandler struct {
- t tomb.Tomb
- fsm *FSM
- conn *net.TCPConn
- msgCh chan *fsmMsg
- errorCh chan bool
+ t tomb.Tomb
+ fsm *FSM
+ conn *net.TCPConn
+ msgCh chan *fsmMsg
+ errorCh chan bool
+ incoming chan *fsmMsg
+ outgoing chan *bgp.BGPMessage
}
-func NewFSMHandler(fsm *FSM) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
f := &FSMHandler{
- fsm: fsm,
- errorCh: make(chan bool, 2),
+ fsm: fsm,
+ errorCh: make(chan bool, 2),
+ incoming: incoming,
+ outgoing: outgoing,
}
f.t.Go(f.loop)
return f
@@ -280,7 +280,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
MsgType: FSM_MSG_BGP_MESSAGE,
MsgData: m,
}
- fsm.incoming <- e
+ h.incoming <- e
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
fsm.passiveConn.Write(b)
@@ -376,7 +376,7 @@ func (h *FSMHandler) sendMessageloop() error {
select {
case <-h.t.Dying():
return nil
- case m := <-fsm.outgoing:
+ case m := <-h.outgoing:
b, _ := m.Serialize()
_, err := conn.Write(b)
if err != nil {
@@ -420,7 +420,7 @@ func (h *FSMHandler) established() bgp.FSMState {
fsm := h.fsm
h.conn = fsm.passiveConn
h.t.Go(h.sendMessageloop)
- h.msgCh = fsm.incoming
+ h.msgCh = h.incoming
h.t.Go(h.recvMessageloop)
for {
@@ -461,7 +461,7 @@ func (h *FSMHandler) loop() error {
MsgType: FSM_MSG_STATE_CHANGE,
MsgData: nextState,
}
- fsm.incoming <- e
+ h.incoming <- e
}
return nil
}
diff --git a/server/peer.go b/server/peer.go
index f928fc10..79209983 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -28,7 +28,8 @@ import (
)
const (
- FLOP_THRESHOLD = time.Second * 30
+ FSM_CHANNEL_LENGTH = 1024
+ FLOP_THRESHOLD = time.Second * 30
)
type peerMsgType int
@@ -49,8 +50,6 @@ type Peer struct {
globalConfig config.GlobalType
peerConfig config.NeighborType
acceptedConnCh chan *net.TCPConn
- incoming chan *fsmMsg
- outgoing chan *bgp.BGPMessage
serverMsgCh chan *serverMsg
peerMsgCh chan *peerMsg
fsm *FSM
@@ -63,6 +62,7 @@ type Peer struct {
capMap map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface
peerInfo *table.PeerInfo
siblings map[string]*serverMsgDataPeer
+ outgoing chan *bgp.BGPMessage
}
func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer) *Peer {
@@ -70,8 +70,6 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *se
globalConfig: g,
peerConfig: peer,
acceptedConnCh: make(chan *net.TCPConn),
- incoming: make(chan *fsmMsg, 4096),
- outgoing: make(chan *bgp.BGPMessage, 4096),
serverMsgCh: serverMsgCh,
peerMsgCh: peerMsgCh,
capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
@@ -80,7 +78,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *se
for _, s := range peerList {
p.siblings[s.address.String()] = s
}
- p.fsm = NewFSM(&g, &peer, p.acceptedConnCh, p.incoming, p.outgoing)
+ p.fsm = NewFSM(&g, &peer, p.acceptedConnCh)
peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE)
peer.BgpNeighborCommonState.Downtime = time.Now()
if peer.NeighborAddress.To4() != nil {
@@ -178,15 +176,15 @@ func (peer *Peer) handleREST(restReq *api.RestRequest) {
j, _ := json.Marshal(peer.rib.Tables[peer.rf])
result.Data = j
case api.REQ_NEIGHBOR_SHUTDOWN:
- peer.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
+ peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil)
case api.REQ_NEIGHBOR_RESET:
- peer.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
+ peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil)
case api.REQ_NEIGHBOR_SOFT_RESET:
case api.REQ_NEIGHBOR_SOFT_RESET_IN:
// check capability
// drop allIn and other peers?
// peer.adjRib.DropAllIn(peer.rf)
- peer.fsm.outgoing <- bgp.NewBGPRouteRefreshMessage(uint16(int(peer.rf)>>16), 0, uint8(int(peer.rf)&0xff))
+ peer.outgoing <- bgp.NewBGPRouteRefreshMessage(uint16(int(peer.rf)>>16), 0, uint8(int(peer.rf)&0xff))
case api.REQ_NEIGHBOR_SOFT_RESET_OUT:
pathList := peer.adjRib.GetOutPathList(peer.rf)
peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
@@ -258,18 +256,19 @@ func (peer *Peer) handleServerMsg(m *serverMsg) {
// this goroutine handles routing table operations
func (peer *Peer) loop() error {
for {
- h := NewFSMHandler(peer.fsm)
+ incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH)
+ peer.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH)
+
+ h := NewFSMHandler(peer.fsm, incoming, peer.outgoing)
sameState := true
for sameState {
select {
case <-peer.t.Dying():
close(peer.acceptedConnCh)
- h.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)
+ peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)
h.Wait()
- close(peer.incoming)
- close(peer.outgoing)
return nil
- case e := <-peer.incoming:
+ case e := <-incoming:
switch e.MsgType {
case FSM_MSG_STATE_CHANGE:
nextState := e.MsgData.(bgp.FSMState)
@@ -303,7 +302,7 @@ func (peer *Peer) loop() error {
case FSM_MSG_BGP_MESSAGE:
switch m := e.MsgData.(type) {
case *bgp.MessageError:
- h.fsm.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)
+ peer.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)
case *bgp.BGPMessage:
peer.handleBGPmessage(m)
default: