diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-03-06 07:36:20 -0800 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-03-07 00:08:04 -0800 |
commit | 0561c11889edab709a51fff7dc82bf25044f51bb (patch) | |
tree | 60a06d5b2160943096b36952e70f988ba5600cbc | |
parent | 74c79d99a87ce626cb7fee2e97396daa822a2514 (diff) |
server: make rx goroutine reading from socket never sleep
Currently, the rx goroutine reading from socket (recvMessageloop
funciton) sleeps if msgCh is full. The problem is that if the rx
goroutine stops reading from a socket, keepalives are ignored, the
holdtime on gobgp expires even if a peer properly sends keepalives.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | server/fsm.go | 28 | ||||
-rw-r--r-- | server/fsm_test.go | 4 | ||||
-rw-r--r-- | server/peer.go | 3 | ||||
-rw-r--r-- | server/server.go | 13 |
4 files changed, 32 insertions, 16 deletions
diff --git a/server/fsm.go b/server/fsm.go index 01fc01c4..94ee588e 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -18,6 +18,7 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -356,15 +357,15 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn net.Conn - msgCh chan *FsmMsg + msgCh *channels.InfiniteChannel errorCh chan FsmStateReason - incoming chan *FsmMsg + incoming *channels.InfiniteChannel stateCh chan *FsmMsg outgoing chan *bgp.BGPMessage holdTimerResetCh chan bool } -func NewFSMHandler(fsm *FSM, incoming, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), @@ -671,9 +672,10 @@ func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { } func (h *FSMHandler) recvMessage() error { + defer h.msgCh.Close() fmsg, _ := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } return nil } @@ -719,7 +721,7 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -754,7 +756,11 @@ func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -885,7 +891,7 @@ func keepaliveTicker(fsm *FSM) *time.Ticker { func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) - h.msgCh = make(chan *FsmMsg) + h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -929,7 +935,11 @@ func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - case e := <-h.msgCh: + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) @@ -1067,7 +1077,7 @@ func (h *FSMHandler) recvMessageloop() error { for { fmsg, err := h.recvMessageWithError() if fmsg != nil { - h.msgCh <- fmsg + h.msgCh.In() <- fmsg } if err != nil { return nil diff --git a/server/fsm_test.go b/server/fsm_test.go index 459c4183..2cfce644 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -18,6 +18,7 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/table" @@ -293,13 +294,12 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { p.fsm = NewFSM(&gConf, &pConf, table.NewRoutingPolicy()) - incoming := make(chan *FsmMsg, 4096) p.outgoing = make(chan *bgp.BGPMessage, 4096) h := &FSMHandler{ fsm: p.fsm, errorCh: make(chan FsmStateReason, 2), - incoming: incoming, + incoming: channels.NewInfiniteChannel(), outgoing: p.outgoing, } diff --git a/server/peer.go b/server/peer.go index da305894..57994bcd 100644 --- a/server/peer.go +++ b/server/peer.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" @@ -228,7 +229,7 @@ func (peer *Peer) handleBGPmessage(e *FsmMsg) ([]*table.Path, []*bgp.BGPMessage, return nil, nil, eor } -func (peer *Peer) startFSMHandler(incoming, stateCh chan *FsmMsg) { +func (peer *Peer) startFSMHandler(incoming *channels.InfiniteChannel, stateCh chan *FsmMsg) { peer.fsm.h = NewFSMHandler(peer.fsm, incoming, stateCh, peer.outgoing) } diff --git a/server/server.go b/server/server.go index a24cea26..a7245198 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/armon/go-radix" + "github.com/eapache/channels" api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet" @@ -93,7 +94,7 @@ type BgpServer struct { addedPeerCh chan config.Neighbor deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor - fsmincomingCh chan *FsmMsg + fsmincomingCh *channels.InfiniteChannel fsmStateCh chan *FsmMsg rpkiConfigCh chan []config.RpkiServer @@ -296,7 +297,7 @@ func (server *BgpServer) Serve() { } } - server.fsmincomingCh = make(chan *FsmMsg, 4096) + server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) var senderMsgs []*SenderMsg @@ -465,8 +466,11 @@ func (server *BgpServer) Serve() { peer := server.neighborMap[addr] peer.conf = config server.setPolicyByConfig(peer.ID(), config.ApplyPolicy) - case e := <-server.fsmincomingCh: - handleFsmMsg(e) + case e, ok := <-server.fsmincomingCh.Out(): + if !ok { + continue + } + handleFsmMsg(e.(*FsmMsg)) case e := <-server.fsmStateCh: handleFsmMsg(e) case sCh <- firstMsg: @@ -1180,6 +1184,7 @@ func (server *BgpServer) Shutdown() { for _, p := range server.neighborMap { p.fsm.adminStateCh <- ADMIN_STATE_DOWN } + // TODO: call fsmincomingCh.Close() } func (server *BgpServer) UpdatePolicy(policy config.RoutingPolicy) { |