summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-03-06 07:36:20 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-03-07 00:08:04 -0800
commit0561c11889edab709a51fff7dc82bf25044f51bb (patch)
tree60a06d5b2160943096b36952e70f988ba5600cbc
parent74c79d99a87ce626cb7fee2e97396daa822a2514 (diff)
server: make rx goroutine reading from socket never sleep
Currently, the rx goroutine reading from socket (recvMessageloop funciton) sleeps if msgCh is full. The problem is that if the rx goroutine stops reading from a socket, keepalives are ignored, the holdtime on gobgp expires even if a peer properly sends keepalives. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/fsm.go28
-rw-r--r--server/fsm_test.go4
-rw-r--r--server/peer.go3
-rw-r--r--server/server.go13
4 files changed, 32 insertions, 16 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 01fc01c4..94ee588e 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -18,6 +18,7 @@ package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -356,15 +357,15 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn net.Conn
- msgCh chan *FsmMsg
+ msgCh *channels.InfiniteChannel
errorCh chan FsmStateReason
- incoming chan *FsmMsg
+ incoming *channels.InfiniteChannel
stateCh chan *FsmMsg
outgoing chan *bgp.BGPMessage
holdTimerResetCh chan bool
}
-func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
+func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler {
h := &FSMHandler{
fsm: fsm,
errorCh: make(chan FsmStateReason, 2),
@@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) {
}
func (h *FSMHandler) recvMessage() error {
+ defer h.msgCh.Close()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
return nil
}
@@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) {
}).Warn("graceful restart timer expired")
return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED
}
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker {
func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
- h.msgCh = make(chan *FsmMsg)
+ h.msgCh = channels.NewInfiniteChannel()
h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) {
// TODO: check error
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- case e := <-h.msgCh:
+ case i, ok := <-h.msgCh.Out():
+ if !ok {
+ continue
+ }
+ e := i.(*FsmMsg)
switch e.MsgData.(type) {
case *bgp.BGPMessage:
m := e.MsgData.(*bgp.BGPMessage)
@@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error {
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
- h.msgCh <- fmsg
+ h.msgCh.In() <- fmsg
}
if err != nil {
return nil
diff --git a/server/fsm_test.go b/server/fsm_test.go
index 459c4183..2cfce644 100644
--- a/server/fsm_test.go
+++ b/server/fsm_test.go
@@ -18,6 +18,7 @@ package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/table"
@@ -293,13 +294,12 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy())
- incoming := make(chan *FsmMsg, 4096)
p.outgoing = make(chan *bgp.BGPMessage, 4096)
h := &FSMHandler{
fsm: p.fsm,
errorCh: make(chan FsmStateReason, 2),
- incoming: incoming,
+ incoming: channels.NewInfiniteChannel(),
outgoing: p.outgoing,
}
diff --git a/server/peer.go b/server/peer.go
index da305894..57994bcd 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -18,6 +18,7 @@ package server
import (
"encoding/json"
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
@@ -228,7 +229,7 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage,
return nil, nil, eor
}
-func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) {
+func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) {
peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing)
}
diff --git a/server/server.go b/server/server.go
index a24cea26..a7245198 100644
--- a/server/server.go
+++ b/server/server.go
@@ -20,6 +20,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/armon/go-radix"
+ "github.com/eapache/channels"
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet"
@@ -93,7 +94,7 @@ type BgpServer struct {
addedPeerCh chan config.Neighbor
deletedPeerCh chan config.Neighbor
updatedPeerCh chan config.Neighbor
- fsmincomingCh chan *FsmMsg
+ fsmincomingCh *channels.InfiniteChannel
fsmStateCh chan *FsmMsg
rpkiConfigCh chan []config.RpkiServer
@@ -296,7 +297,7 @@ func (server *BgpServer) Serve() {
}
}
- server.fsmincomingCh = make(chan *FsmMsg, 4096)
+ server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
var senderMsgs []*SenderMsg
@@ -465,8 +466,11 @@ func (server *BgpServer) Serve() {
peer := server.neighborMap[addr]
peer.conf = config
server.setPolicyByConfig(peer.ID(), config.ApplyPolicy)
- case e := <-server.fsmincomingCh:
- handleFsmMsg(e)
+ case e, ok := <-server.fsmincomingCh.Out():
+ if !ok {
+ continue
+ }
+ handleFsmMsg(e.(*FsmMsg))
case e := <-server.fsmStateCh:
handleFsmMsg(e)
case sCh <- firstMsg:
@@ -1180,6 +1184,7 @@ func (server *BgpServer) Shutdown() {
for _, p := range server.neighborMap {
p.fsm.adminStateCh <- ADMIN_STATE_DOWN
}
+ // TODO: call fsmincomingCh.Close()
}
func (server *BgpServer) UpdatePolicy(policy config.RoutingPolicy) {