summaryrefslogtreecommitdiffhomepage
path: root/server/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/peer.go')
-rw-r--r--server/peer.go116
1 files changed, 85 insertions, 31 deletions
diff --git a/server/peer.go b/server/peer.go
index 14b3548d..a96a89fa 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -25,13 +25,14 @@ import (
"github.com/osrg/gobgp/table"
"gopkg.in/tomb.v2"
"net"
- "strings"
+ "strconv"
"time"
)
const (
FSM_CHANNEL_LENGTH = 1024
FLOP_THRESHOLD = time.Second * 30
+ MIN_CONNECT_RETRY = 10
)
type peerMsgType int
@@ -48,14 +49,15 @@ type peerMsg struct {
}
type Peer struct {
- t tomb.Tomb
- globalConfig config.Global
- peerConfig config.Neighbor
- acceptedConnCh chan net.Conn
- serverMsgCh chan *serverMsg
- peerMsgCh chan *peerMsg
- fsm *FSM
- adjRib *table.AdjRib
+ t tomb.Tomb
+ globalConfig config.Global
+ peerConfig config.Neighbor
+ connCh chan net.Conn
+ serverMsgCh chan *serverMsg
+ peerMsgCh chan *peerMsg
+ getActiveCh chan struct{}
+ fsm *FSM
+ adjRib *table.AdjRib
// peer and rib are always not one-to-one so should not be
// here but it's the simplest and works our first target.
rib *table.TableManager
@@ -73,20 +75,21 @@ type Peer struct {
func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer, isGlobalRib bool, policyMap map[string]*policy.Policy) *Peer {
p := &Peer{
- globalConfig: g,
- peerConfig: peer,
- acceptedConnCh: make(chan net.Conn),
- serverMsgCh: serverMsgCh,
- peerMsgCh: peerMsgCh,
- rfMap: make(map[bgp.RouteFamily]bool),
- capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
- isGlobalRib: isGlobalRib,
+ globalConfig: g,
+ peerConfig: peer,
+ connCh: make(chan net.Conn),
+ serverMsgCh: serverMsgCh,
+ peerMsgCh: peerMsgCh,
+ getActiveCh: make(chan struct{}),
+ rfMap: make(map[bgp.RouteFamily]bool),
+ capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface),
+ isGlobalRib: isGlobalRib,
}
p.siblings = make(map[string]*serverMsgDataPeer)
for _, s := range peerList {
p.siblings[s.address.String()] = s
}
- p.fsm = NewFSM(&g, &peer, p.acceptedConnCh)
+ p.fsm = NewFSM(&g, &peer, p.connCh)
peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE)
peer.BgpNeighborCommonState.Downtime = time.Now().Unix()
for _, rf := range peer.AfiSafiList {
@@ -103,6 +106,9 @@ func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg,
p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList)
p.setPolicy(policyMap)
p.t.Go(p.loop)
+ if !peer.TransportOptions.PassiveMode && !isGlobalRib {
+ p.t.Go(p.connectLoop)
+ }
return p
}
@@ -556,6 +562,56 @@ func (peer *Peer) handleServerMsg(m *serverMsg) {
}
}
+func (peer *Peer) connectLoop() error {
+ var tick int
+ if tick = int(peer.fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY {
+ tick = MIN_CONNECT_RETRY
+ }
+
+ ticker := time.NewTicker(time.Duration(tick) * time.Second)
+ ticker.Stop()
+
+ connect := func() {
+ if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE {
+ var host string
+ addr := peer.peerConfig.NeighborAddress
+
+ if addr.To4() != nil {
+ host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT)
+ } else {
+ host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT)
+ }
+
+ conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second)
+ if err == nil {
+ peer.connCh <- conn
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.peerConfig.NeighborAddress,
+ }).Debugf("failed to connect: %s", err)
+ }
+ }
+ }
+
+ for {
+ select {
+ case <-peer.t.Dying():
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.peerConfig.NeighborAddress,
+ }).Debug("stop connect loop")
+ ticker.Stop()
+ return nil
+ case <-ticker.C:
+ connect()
+ case <-peer.getActiveCh:
+ connect()
+ ticker = time.NewTicker(time.Duration(tick) * time.Second)
+ }
+ }
+}
+
// this goroutine handles routing table operations
func (peer *Peer) loop() error {
for {
@@ -566,14 +622,21 @@ func (peer *Peer) loop() error {
if !peer.isGlobalRib {
h = NewFSMHandler(peer.fsm, incoming, peer.outgoing)
- if peer.peerConfig.BgpNeighborCommonState.State == uint32(bgp.BGP_FSM_ESTABLISHED) {
+ switch peer.peerConfig.BgpNeighborCommonState.State {
+ case uint32(bgp.BGP_FSM_ESTABLISHED):
+ peer.peerConfig.LocalAddress = peer.fsm.LocalAddr()
for rf, _ := range peer.rfMap {
pathList := peer.adjRib.GetOutPathList(rf)
peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList))
}
peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix()
peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++
- } else {
+ case uint32(bgp.BGP_FSM_ACTIVE):
+ if !peer.peerConfig.TransportOptions.PassiveMode {
+ peer.getActiveCh <- struct{}{}
+ }
+ fallthrough
+ default:
peer.fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix()
}
}
@@ -582,7 +645,7 @@ func (peer *Peer) loop() error {
for sameState {
select {
case <-peer.t.Dying():
- close(peer.acceptedConnCh)
+ close(peer.connCh)
peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil)
// h.t.Kill(nil) will be called
// internall so even goroutines in
@@ -651,16 +714,7 @@ func (peer *Peer) Stop() error {
}
func (peer *Peer) PassConn(conn *net.TCPConn) {
- localAddr := func(addrPort string) string {
- if strings.Index(addrPort, "[") == -1 {
- return strings.Split(addrPort, ":")[0]
- }
- idx := strings.LastIndex(addrPort, ":")
- return addrPort[1 : idx-1]
- }(conn.LocalAddr().String())
-
- peer.peerConfig.LocalAddress = net.ParseIP(localAddr)
- peer.acceptedConnCh <- conn
+ peer.connCh <- conn
}
func (peer *Peer) MarshalJSON() ([]byte, error) {