diff options
-rw-r--r-- | server/fsm.go | 28 | ||||
-rw-r--r-- | server/fsm_test.go | 4 | ||||
-rw-r--r-- | server/peer.go | 3 | ||||
-rw-r--r-- | server/server.go | 13 |
4 files changed, 32 insertions, 16 deletions
diff --git a/server/fsm.go b/server/fsm.go index 01fc01c4..94ee588e 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -18,6 +18,7 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -356,15 +357,15 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn net.Conn - msgCh chan *FsmMsg + msgCh *channels.InfiniteChannel errorCh chan FsmStateReason - incoming chan *FsmMsg + incoming *channels.InfiniteChannel stateCh chan *FsmMsg outgoing chan *bgp.BGPMessage holdTimerResetCh chan bool } -func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), @@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { } func (h *FSMHandler) recvMessage() error { + defer h.msgCh.Close() fmsg, _ := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } return nil } @@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker { func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error { for { fmsg, err := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } if err != nil { return nil diff --git a/server/fsm_test.go b/server/fsm_test.go index 459c4183..2cfce644 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -18,6 +18,7 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -293,13 +294,12 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy()) - incoming := make(chan *FsmMsg, 4096) p.outgoing = make(chan *bgp.BGPMessage, 4096) h := &FSMHandler{ fsm: p.fsm, errorCh: make(chan FsmStateReason, 2), - incoming: incoming, + incoming: channels.NewInfiniteChannel(), outgoing: p.outgoing, } diff --git a/server/peer.go b/server/peer.go index da305894..57994bcd 100644 --- a/server/peer.go +++ b/server/peer.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" @@ -228,7 +229,7 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage, return nil, nil, eor } -func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) { +func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) { peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) } diff --git a/server/server.go b/server/server.go index a24cea26..a7245198 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/armon/go-radix" + "github.com/eapache/channels" api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" @@ -93,7 +94,7 @@ type BgpServer struct { addedPeerCh chan config.Neighbor deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor - fsmincomingCh chan *FsmMsg + fsmincomingCh *channels.InfiniteChannel fsmStateCh chan *FsmMsg rpkiConfigCh chan []config.RpkiServer @@ -296,7 +297,7 @@ func (server *BgpServer) Serve() { } } - server.fsmincomingCh = make(chan *FsmMsg, 4096) + server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) var senderMsgs []*SenderMsg @@ -465,8 +466,11 @@ func (server *BgpServer) Serve() { peer := server.neighborMap[addr] peer.conf = config server.setPolicyByConfig(peer.ID(), config.ApplyPolicy) - case e := <-server.fsmincomingCh: - handleFsmMsg(e) + case e, ok := <-server.fsmincomingCh.Out(): + if !ok { + continue + } + handleFsmMsg(e.(*FsmMsg)) case e := <-server.fsmStateCh: handleFsmMsg(e) case sCh <- firstMsg: @@ -1180,6 +1184,7 @@ func (server *BgpServer) Shutdown() { for _, p := range server.neighborMap { p.fsm.adminStateCh <- ADMIN_STATE_DOWN } + // TODO: call fsmincomingCh.Close() } func (server *BgpServer) UpdatePolicy(policy config.RoutingPolicy) { |