diff options
Diffstat (limited to 'server/fsm.go')
-rw-r--r-- | server/fsm.go | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/server/fsm.go b/server/fsm.go index 01fc01c4..94ee588e 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -18,6 +18,7 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -356,15 +357,15 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn net.Conn - msgCh chan *FsmMsg + msgCh *channels.InfiniteChannel errorCh chan FsmStateReason - incoming chan *FsmMsg + incoming *channels.InfiniteChannel stateCh chan *FsmMsg outgoing chan *bgp.BGPMessage holdTimerResetCh chan bool } -func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), @@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { } func (h *FSMHandler) recvMessage() error { + defer h.msgCh.Close() fmsg, _ := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } return nil } @@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker { func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error { for { fmsg, err := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } if err != nil { return nil |