summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/fsm.go28
-rw-r--r--server/fsm_test.go4
-rw-r--r--server/peer.go3
-rw-r--r--server/server.go13
4 files changed, 32 insertions, 16 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 01fc01c4..94ee588e 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -18,6 +18,7 @@ package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -356,15 +357,15 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn net.Conn
- msgCh chan *FsmMsg
+ msgCh *channels.InfiniteChannel
errorCh chan FsmStateReason
- incoming chan *FsmMsg
+ incoming *channels.InfiniteChannel
stateCh chan *FsmMsg
outgoing chan *bgp.BGPMessage
holdTimerResetCh chan bool
}
-func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan FsmStateReason, 2),
@@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
}
func (h *FSMHandler) recvMessage() error {
+ defer h.msgCh.Close()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
return nil
}
@@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
}).Warn("graceful restart timer expired")
return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED
}
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker {
func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
// TODO: check error
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error {
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
if err != nil {
return nil
diff --git a/server/fsm_test.go b/server/fsm_test.go
index 459c4183..2cfce644 100644
--- a/server/fsm_test.go
+++ b/server/fsm_test.go
@@ -18,6 +18,7 @@ package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -293,13 +294,12 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy())
- incoming := make(chan *FsmMsg, 4096)
p.outgoing = make(chan *bgp.BGPMessage, 4096)
h := &FSMHandler{
fsm: p.fsm,
errorCh: make(chan FsmStateReason, 2),
- incoming: incoming,
+ incoming: channels.NewInfiniteChannel(),
outgoing: p.outgoing,
}
diff --git a/server/peer.go b/server/peer.go
index da305894..57994bcd 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -18,6 +18,7 @@ package server
import (
"encoding/json"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
@@ -228,7 +229,7 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage,
return nil, nil, eor
}
-func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) {
+func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) {
peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
}
diff --git a/server/server.go b/server/server.go
index a24cea26..a7245198 100644
--- a/server/server.go
+++ b/server/server.go
@@ -20,6 +20,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/armon/go-radix"
+ "github.com/eapache/channels"
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
@@ -93,7 +94,7 @@ type BgpServer struct {
addedPeerCh chan config.Neighbor
deletedPeerCh chan config.Neighbor
updatedPeerCh chan config.Neighbor
- fsmincomingCh chan *FsmMsg
+ fsmincomingCh *channels.InfiniteChannel
fsmStateCh chan *FsmMsg
rpkiConfigCh chan []config.RpkiServer
@@ -296,7 +297,7 @@ func (server *BgpServer) Serve() {
}
}
- server.fsmincomingCh = make(chan *FsmMsg, 4096)
+ server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
var senderMsgs []*SenderMsg
@@ -465,8 +466,11 @@ func (server *BgpServer) Serve() {
peer := server.neighborMap[addr]
peer.conf = config
server.setPolicyByConfig(peer.ID(), config.ApplyPolicy)
- case e := <-server.fsmincomingCh:
- handleFsmMsg(e)
+ case e, ok := <-server.fsmincomingCh.Out():
+ if !ok {
+ continue
+ }
+ handleFsmMsg(e.(*FsmMsg))
case e := <-server.fsmStateCh:
handleFsmMsg(e)
case sCh <- firstMsg:
@@ -1180,6 +1184,7 @@ func (server *BgpServer) Shutdown() {
for _, p := range server.neighborMap {
p.fsm.adminStateCh <- ADMIN_STATE_DOWN
}
+ // TODO: call fsmincomingCh.Close()
}
func (server *BgpServer) UpdatePolicy(policy config.RoutingPolicy) {