summaryrefslogtreecommitdiffhomepage
path: root/server/fsm.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/fsm.go')
-rw-r--r--server/fsm.go28
1 files changed, 19 insertions, 9 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 01fc01c4..94ee588e 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -18,6 +18,7 @@ package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -356,15 +357,15 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn net.Conn
- msgCh chan *FsmMsg
+ msgCh *channels.InfiniteChannel
errorCh chan FsmStateReason
- incoming chan *FsmMsg
+ incoming *channels.InfiniteChannel
stateCh chan *FsmMsg
outgoing chan *bgp.BGPMessage
holdTimerResetCh chan bool
}
-func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan FsmStateReason, 2),
@@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
}
func (h *FSMHandler) recvMessage() error {
+ defer h.msgCh.Close()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
return nil
}
@@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
}).Warn("graceful restart timer expired")
return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED
}
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker {
func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
// TODO: check error
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error {
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
if err != nil {
return nil