diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-18 10:25:36 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-18 10:25:36 +0900 |
commit | a9e55d3bd75fd5dd633fee1ff2c9d45d6d6447ee (patch) | |
tree | 792429269f168a34df4ca78baef5909b072c200c /server/peer.go | |
parent | ac54c58e9d5466804f67c9aca32a23beaeb6882d (diff) |
server: move outgoing and incoming channels to FSMHandler
This patch solves the problem that old messages in outgoing will be
sent in a new established session.
Peer goroutine puts some messagages in the outgoing channel. Then the
state changes from established. In such case, once the state becomes
established, the "stale" messages in the outgoing channel will be sent.
With this patch, new outgoing and incoming channels are created at
every state change. No more "stale" messages in the channles.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/peer.go')
-rw-r--r-- | server/peer.go | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/server/peer.go b/server/peer.go index f928fc10..79209983 100644 --- a/server/peer.go +++ b/server/peer.go @@ -28,7 +28,8 @@ import ( ) const ( - FLOP_THRESHOLD = time.Second * 30 + FSM_CHANNEL_LENGTH = 1024 + FLOP_THRESHOLD = time.Second * 30 ) type peerMsgType int @@ -49,8 +50,6 @@ type Peer struct { globalConfig config.GlobalType peerConfig config.NeighborType acceptedConnCh chan *net.TCPConn - incoming chan *fsmMsg - outgoing chan *bgp.BGPMessage serverMsgCh chan *serverMsg peerMsgCh chan *peerMsg fsm *FSM @@ -63,6 +62,7 @@ type Peer struct { capMap map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface peerInfo *table.PeerInfo siblings map[string]*serverMsgDataPeer + outgoing chan *bgp.BGPMessage } func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer) *Peer { @@ -70,8 +70,6 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *se globalConfig: g, peerConfig: peer, acceptedConnCh: make(chan *net.TCPConn), - incoming: make(chan *fsmMsg, 4096), - outgoing: make(chan *bgp.BGPMessage, 4096), serverMsgCh: serverMsgCh, peerMsgCh: peerMsgCh, capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), @@ -80,7 +78,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *se for _, s := range peerList { p.siblings[s.address.String()] = s } - p.fsm = NewFSM(&g, &peer, p.acceptedConnCh, p.incoming, p.outgoing) + p.fsm = NewFSM(&g, &peer, p.acceptedConnCh) peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) peer.BgpNeighborCommonState.Downtime = time.Now() if peer.NeighborAddress.To4() != nil { @@ -178,15 +176,15 @@ func (peer *Peer) handleREST(restReq *api.RestRequest) { j, _ := json.Marshal(peer.rib.Tables[peer.rf]) result.Data = j case api.REQ_NEIGHBOR_SHUTDOWN: - peer.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) + peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) case api.REQ_NEIGHBOR_RESET: - peer.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) + peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET, nil) case api.REQ_NEIGHBOR_SOFT_RESET: case api.REQ_NEIGHBOR_SOFT_RESET_IN: // check capability // drop allIn and other peers? // peer.adjRib.DropAllIn(peer.rf) - peer.fsm.outgoing <- bgp.NewBGPRouteRefreshMessage(uint16(int(peer.rf)>>16), 0, uint8(int(peer.rf)&0xff)) + peer.outgoing <- bgp.NewBGPRouteRefreshMessage(uint16(int(peer.rf)>>16), 0, uint8(int(peer.rf)&0xff)) case api.REQ_NEIGHBOR_SOFT_RESET_OUT: pathList := peer.adjRib.GetOutPathList(peer.rf) peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) @@ -258,18 +256,19 @@ func (peer *Peer) handleServerMsg(m *serverMsg) { // this goroutine handles routing table operations func (peer *Peer) loop() error { for { - h := NewFSMHandler(peer.fsm) + incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH) + peer.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH) + + h := NewFSMHandler(peer.fsm, incoming, peer.outgoing) sameState := true for sameState { select { case <-peer.t.Dying(): close(peer.acceptedConnCh) - h.fsm.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil) + peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil) h.Wait() - close(peer.incoming) - close(peer.outgoing) return nil - case e := <-peer.incoming: + case e := <-incoming: switch e.MsgType { case FSM_MSG_STATE_CHANGE: nextState := e.MsgData.(bgp.FSMState) @@ -303,7 +302,7 @@ func (peer *Peer) loop() error { case FSM_MSG_BGP_MESSAGE: switch m := e.MsgData.(type) { case *bgp.MessageError: - h.fsm.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data) + peer.outgoing <- bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data) case *bgp.BGPMessage: peer.handleBGPmessage(m) default: |