summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/fsm.go45
-rw-r--r--server/fsm_test.go32
-rw-r--r--server/peer.go116
-rw-r--r--server/peer_test.go34
4 files changed, 152 insertions, 75 deletions
diff --git a/server/fsm.go b/server/fsm.go
index 50323ef1..d657b404 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -40,6 +40,7 @@ type fsmMsg struct {
const (
HOLDTIME_OPENSENT = 240
+ HOLDTIME_IDLE = 5
)
type AdminState int
@@ -65,8 +66,8 @@ type FSM struct {
peerConfig *config.Neighbor
keepaliveTicker *time.Ticker
state bgp.FSMState
- passiveConn net.Conn
- passiveConnCh chan net.Conn
+ conn net.Conn
+ connCh chan net.Conn
idleHoldTime float64
opensentHoldTime float64
negotiatedHoldTime float64
@@ -127,7 +128,7 @@ func NewFSM(gConfig *config.Global, pConfig *config.Neighbor, connCh chan net.Co
globalConfig: gConfig,
peerConfig: pConfig,
state: bgp.BGP_FSM_IDLE,
- passiveConnCh: connCh,
+ connCh: connCh,
opensentHoldTime: float64(HOLDTIME_OPENSENT),
adminState: ADMIN_STATE_UP,
adminStateCh: make(chan AdminState, 1),
@@ -144,6 +145,18 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) {
fsm.state = nextState
}
+func (fsm *FSM) LocalAddr() net.IP {
+ addr := fsm.conn.LocalAddr()
+ if addr != nil {
+ host, _, err := net.SplitHostPort(addr.String())
+ if err != nil {
+ return nil
+ }
+ return net.ParseIP(host)
+ }
+ return nil
+}
+
func (fsm *FSM) sendNotificatonFromErrorMsg(conn net.Conn, e *bgp.MessageError) {
m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data)
b, _ := m.Serialize()
@@ -211,7 +224,7 @@ func (h *FSMHandler) idle() bgp.FSMState {
select {
case <-h.t.Dying():
return 0
- case conn, ok := <-fsm.passiveConnCh:
+ case conn, ok := <-fsm.connCh:
if !ok {
break
}
@@ -228,7 +241,7 @@ func (h *FSMHandler) idle() bgp.FSMState {
"Key": fsm.peerConfig.NeighborAddress,
"Duration": fsm.idleHoldTime,
}).Debug("IdleHoldTimer expired")
- fsm.idleHoldTime = 0
+ fsm.idleHoldTime = HOLDTIME_IDLE
return bgp.BGP_FSM_ACTIVE
} else {
@@ -258,11 +271,11 @@ func (h *FSMHandler) active() bgp.FSMState {
select {
case <-h.t.Dying():
return 0
- case conn, ok := <-fsm.passiveConnCh:
+ case conn, ok := <-fsm.connCh:
if !ok {
break
}
- fsm.passiveConn = conn
+ fsm.conn = conn
// we don't implement delayed open timer so move to opensent right
// away.
return bgp.BGP_FSM_OPENSENT
@@ -393,11 +406,11 @@ func (h *FSMHandler) opensent() bgp.FSMState {
fsm := h.fsm
m := buildopen(fsm.globalConfig, fsm.peerConfig)
b, _ := m.Serialize()
- fsm.passiveConn.Write(b)
+ fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
h.msgCh = make(chan *fsmMsg)
- h.conn = fsm.passiveConn
+ h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -412,7 +425,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
case <-h.t.Dying():
h.conn.Close()
return 0
- case conn, ok := <-fsm.passiveConnCh:
+ case conn, ok := <-fsm.connCh:
if !ok {
break
}
@@ -440,7 +453,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
h.incoming <- e
msg := bgp.NewBGPKeepAliveMessage()
b, _ := msg.Serialize()
- fsm.passiveConn.Write(b)
+ fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(msg.Header.Type, false)
return bgp.BGP_FSM_OPENCONFIRM
} else {
@@ -489,7 +502,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
fsm := h.fsm
h.msgCh = make(chan *fsmMsg)
- h.conn = fsm.passiveConn
+ h.conn = fsm.conn
h.t.Go(h.recvMessage)
@@ -510,7 +523,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
case <-h.t.Dying():
h.conn.Close()
return 0
- case conn, ok := <-fsm.passiveConnCh:
+ case conn, ok := <-fsm.connCh:
if !ok {
break
}
@@ -523,7 +536,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
m := bgp.NewBGPKeepAliveMessage()
b, _ := m.Serialize()
// TODO: check error
- fsm.passiveConn.Write(b)
+ fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
case e := <-h.msgCh:
switch e.MsgData.(type) {
@@ -667,7 +680,7 @@ func (h *FSMHandler) recvMessageloop() error {
func (h *FSMHandler) established() bgp.FSMState {
fsm := h.fsm
- h.conn = fsm.passiveConn
+ h.conn = fsm.conn
h.t.Go(h.sendMessageloop)
h.msgCh = h.incoming
h.t.Go(h.recvMessageloop)
@@ -683,7 +696,7 @@ func (h *FSMHandler) established() bgp.FSMState {
select {
case <-h.t.Dying():
return 0
- case conn, ok := <-fsm.passiveConnCh:
+ case conn, ok := <-fsm.connCh:
if !ok {
break
}
diff --git a/server/fsm_test.go b/server/fsm_test.go
index 01acf6aa..7e321787 100644
--- a/server/fsm_test.go
+++ b/server/fsm_test.go
@@ -121,6 +121,12 @@ func (m *MockConnection) Close() error {
return nil
}
+func (m *MockConnection) LocalAddr() net.Addr {
+ return &net.TCPAddr{
+ IP: net.ParseIP("10.10.10.10"),
+ Port: bgp.BGP_PORT}
+}
+
func TestReadAll(t *testing.T) {
assert := assert.New(t)
m := NewMockConnection()
@@ -157,7 +163,7 @@ func TestFSMHandlerOpensent_HoldTimerExpired(t *testing.T) {
p, h := makePeerAndHandler()
// push mock connection
- p.fsm.passiveConn = m
+ p.fsm.conn = m
// set up keepalive ticker
sec := time.Second * 1
@@ -183,7 +189,7 @@ func TestFSMHandlerOpenconfirm_HoldTimerExpired(t *testing.T) {
p, h := makePeerAndHandler()
// push mock connection
- p.fsm.passiveConn = m
+ p.fsm.conn = m
// set up keepalive ticker
p.fsm.peerConfig.Timers.KeepaliveInterval = 1
@@ -207,7 +213,7 @@ func TestFSMHandlerEstablish_HoldTimerExpired(t *testing.T) {
p, h := makePeerAndHandler()
// push mock connection
- p.fsm.passiveConn = m
+ p.fsm.conn = m
// set up keepalive ticker
sec := time.Second * 1
@@ -245,7 +251,7 @@ func TestFSMHandlerOpenconfirm_HoldtimeZero(t *testing.T) {
p, h := makePeerAndHandler()
// push mock connection
- p.fsm.passiveConn = m
+ p.fsm.conn = m
// set up keepalive ticker
p.fsm.peerConfig.Timers.KeepaliveInterval = 1
@@ -267,7 +273,7 @@ func TestFSMHandlerEstablished_HoldtimeZero(t *testing.T) {
p, h := makePeerAndHandler()
// push mock connection
- p.fsm.passiveConn = m
+ p.fsm.conn = m
// set up keepalive ticker
sec := time.Second * 1
@@ -288,16 +294,17 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
neighborConfig := config.Neighbor{}
p := &Peer{
- globalConfig: globalConfig,
- peerConfig: neighborConfig,
- acceptedConnCh: make(chan net.Conn),
- serverMsgCh: make(chan *serverMsg),
- peerMsgCh: make(chan *peerMsg),
- capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
+ globalConfig: globalConfig,
+ peerConfig: neighborConfig,
+ connCh: make(chan net.Conn),
+ serverMsgCh: make(chan *serverMsg),
+ peerMsgCh: make(chan *peerMsg),
+ getActiveCh: make(chan struct{}),
+ capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
}
p.siblings = make(map[string]*serverMsgDataPeer)
- p.fsm = NewFSM(&globalConfig, &neighborConfig, p.acceptedConnCh)
+ p.fsm = NewFSM(&globalConfig, &neighborConfig, p.connCh)
incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH)
p.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH)
@@ -308,6 +315,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) {
incoming: incoming,
outgoing: p.outgoing,
}
+ p.t.Go(p.connectLoop)
return p, h
diff --git a/server/peer.go b/server/peer.go
index 14b3548d..a96a89fa 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -25,13 +25,14 @@ import (
"github.com/osrg/gobgp/table"
"gopkg.in/tomb.v2"
"net"
- "strings"
+ "strconv"
"time"
)
const (
FSM_CHANNEL_LENGTH = 1024
FLOP_THRESHOLD = time.Second * 30
+ MIN_CONNECT_RETRY = 10
)
type peerMsgType int
@@ -48,14 +49,15 @@ type peerMsg struct {
}
type Peer struct {
- t tomb.Tomb
- globalConfig config.Global
- peerConfig config.Neighbor
- acceptedConnCh chan net.Conn
- serverMsgCh chan *serverMsg
- peerMsgCh chan *peerMsg
- fsm *FSM
- adjRib *table.AdjRib
+ t tomb.Tomb
+ globalConfig config.Global
+ peerConfig config.Neighbor
+ connCh chan net.Conn
+ serverMsgCh chan *serverMsg
+ peerMsgCh chan *peerMsg
+ getActiveCh chan struct{}
+ fsm *FSM
+ adjRib *table.AdjRib
// peer and rib are always not one-to-one so should not be
// here but it's the simplest and works our first target.
rib *table.TableManager
@@ -73,20 +75,21 @@ type Peer struct {
func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer, isGlobalRib bool, policyMap map[string]*policy.Policy) *Peer {
p := &Peer{
- globalConfig: g,
- peerConfig: peer,
- acceptedConnCh: make(chan net.Conn),
- serverMsgCh: serverMsgCh,
- peerMsgCh: peerMsgCh,
- rfMap: make(map[bgp.RouteFamily]bool),
- capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
- isGlobalRib: isGlobalRib,
+ globalConfig: g,
+ peerConfig: peer,
+ connCh: make(chan net.Conn),
+ serverMsgCh: serverMsgCh,
+ peerMsgCh: peerMsgCh,
+ getActiveCh: make(chan struct{}),
+ rfMap: make(map[bgp.RouteFamily]bool),
+ capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
+ isGlobalRib: isGlobalRib,
}
p.siblings = make(map[string]*serverMsgDataPeer)
for _, s := range peerList {
p.siblings[s.address.String()] = s
}
- p.fsm = NewFSM(&g, &peer, p.acceptedConnCh)
+ p.fsm = NewFSM(&g, &peer, p.connCh)
peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE)
peer.BgpNeighborCommonState.Downtime = time.Now().Unix()
for _, rf := range peer.AfiSafiList {
@@ -103,6 +106,9 @@ func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg,
p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList)
p.setPolicy(policyMap)
p.t.Go(p.loop)
+ if !peer.TransportOptions.PassiveMode && !isGlobalRib {
+ p.t.Go(p.connectLoop)
+ }
return p
}
@@ -556,6 +562,56 @@ func (peer *Peer) handleServerMsg(m *serverMsg) {
}
}
+func (peer *Peer) connectLoop() error {
+ var tick int
+ if tick = int(peer.fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY {
+ tick = MIN_CONNECT_RETRY
+ }
+
+ ticker := time.NewTicker(time.Duration(tick) * time.Second)
+ ticker.Stop()
+
+ connect := func() {
+ if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE {
+ var host string
+ addr := peer.peerConfig.NeighborAddress
+
+ if addr.To4() != nil {
+ host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT)
+ } else {
+ host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT)
+ }
+
+ conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second)
+ if err == nil {
+ peer.connCh <- conn
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.peerConfig.NeighborAddress,
+ }).Debugf("failed to connect: %s", err)
+ }
+ }
+ }
+
+ for {
+ select {
+ case <-peer.t.Dying():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.peerConfig.NeighborAddress,
+ }).Debug("stop connect loop")
+ ticker.Stop()
+ return nil
+ case <-ticker.C:
+ connect()
+ case <-peer.getActiveCh:
+ connect()
+ ticker = time.NewTicker(time.Duration(tick) * time.Second)
+ }
+ }
+}
+
// this goroutine handles routing table operations
func (peer *Peer) loop() error {
for {
@@ -566,14 +622,21 @@ func (peer *Peer) loop() error {
if !peer.isGlobalRib {
h = NewFSMHandler(peer.fsm, incoming, peer.outgoing)
- if peer.peerConfig.BgpNeighborCommonState.State == uint32(bgp.BGP_FSM_ESTABLISHED) {
+ switch peer.peerConfig.BgpNeighborCommonState.State {
+ case uint32(bgp.BGP_FSM_ESTABLISHED):
+ peer.peerConfig.LocalAddress = peer.fsm.LocalAddr()
for rf, _ := range peer.rfMap {
pathList := peer.adjRib.GetOutPathList(rf)
peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
}
peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix()
peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
- } else {
+ case uint32(bgp.BGP_FSM_ACTIVE):
+ if !peer.peerConfig.TransportOptions.PassiveMode {
+ peer.getActiveCh <- struct{}{}
+ }
+ fallthrough
+ default:
peer.fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix()
}
}
@@ -582,7 +645,7 @@ func (peer *Peer) loop() error {
for sameState {
select {
case <-peer.t.Dying():
- close(peer.acceptedConnCh)
+ close(peer.connCh)
peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)
// h.t.Kill(nil) will be called
// internall so even goroutines in
@@ -651,16 +714,7 @@ func (peer *Peer) Stop() error {
}
func (peer *Peer) PassConn(conn *net.TCPConn) {
- localAddr := func(addrPort string) string {
- if strings.Index(addrPort, "[") == -1 {
- return strings.Split(addrPort, ":")[0]
- }
- idx := strings.LastIndex(addrPort, ":")
- return addrPort[1 : idx-1]
- }(conn.LocalAddr().String())
-
- peer.peerConfig.LocalAddress = net.ParseIP(localAddr)
- peer.acceptedConnCh <- conn
+ peer.connCh <- conn
}
func (peer *Peer) MarshalJSON() ([]byte, error) {
diff --git a/server/peer_test.go b/server/peer_test.go
index f8703d2c..23588fdc 100644
--- a/server/peer_test.go
+++ b/server/peer_test.go
@@ -120,7 +120,7 @@ func TestPeerAdminShutdownWhileEstablished(t *testing.T) {
go pushPackets()
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000)
restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC)
@@ -235,7 +235,7 @@ func TestPeerAdminShutdownWhileOpensent(t *testing.T) {
peer.t.Go(peer.loop)
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000)
restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC)
@@ -280,7 +280,7 @@ func TestPeerAdminShutdownWhileOpenconfirm(t *testing.T) {
}
go pushPackets()
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000)
restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC)
@@ -330,7 +330,7 @@ func TestPeerAdminEnable(t *testing.T) {
go pushPackets()
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000)
// shutdown peer at first
@@ -361,7 +361,7 @@ func TestPeerAdminEnable(t *testing.T) {
json.Unmarshal(result.Data, &res)
assert.Equal("ADMIN_STATE_UP", res["result"])
- waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
+ waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, (HOLDTIME_IDLE+1)*1000)
assert.Equal(bgp.BGP_FSM_ACTIVE, peer.fsm.state)
m2 := NewMockConnection()
@@ -373,7 +373,7 @@ func TestPeerAdminEnable(t *testing.T) {
}
go pushPackets()
- peer.acceptedConnCh <- m2
+ peer.connCh <- m2
waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000)
assert.Equal(bgp.BGP_FSM_ESTABLISHED, peer.fsm.state)
@@ -394,7 +394,7 @@ func TestPeerAdminShutdownReject(t *testing.T) {
peer.t.Go(peer.loop)
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000)
restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC)
@@ -451,7 +451,7 @@ func TestPeerSelectSmallerHoldtime(t *testing.T) {
go pushPackets()
waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000)
- peer.acceptedConnCh <- m
+ peer.connCh <- m
waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000)
assert.Equal(float64(0), peer.fsm.negotiatedHoldTime)
@@ -500,17 +500,18 @@ func makePeer(globalConfig config.Global, peerConfig config.Neighbor) *Peer {
pch := make(chan *peerMsg, 4096)
p := &Peer{
- globalConfig: globalConfig,
- peerConfig: peerConfig,
- acceptedConnCh: make(chan net.Conn),
- serverMsgCh: sch,
- peerMsgCh: pch,
- rfMap: make(map[bgp.RouteFamily]bool),
- capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
+ globalConfig: globalConfig,
+ peerConfig: peerConfig,
+ connCh: make(chan net.Conn),
+ serverMsgCh: sch,
+ peerMsgCh: pch,
+ getActiveCh: make(chan struct{}),
+ rfMap: make(map[bgp.RouteFamily]bool),
+ capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
}
p.siblings = make(map[string]*serverMsgDataPeer)
- p.fsm = NewFSM(&globalConfig, &peerConfig, p.acceptedConnCh)
+ p.fsm = NewFSM(&globalConfig, &peerConfig, p.connCh)
peerConfig.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE)
peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix()
if peerConfig.NeighborAddress.To4() != nil {
@@ -527,6 +528,7 @@ func makePeer(globalConfig config.Global, peerConfig config.Neighbor) *Peer {
rfList := []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC}
p.adjRib = table.NewAdjRib(rfList)
p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList)
+ p.t.Go(p.connectLoop)
return p
}